rustfs-kafka 0.22.0

Rust client for Apache Kafka
Documentation
use kafka_protocol::messages::{ApiKey, ProduceRequest, ProduceResponse, RequestHeader, TopicName};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::records::{Record, RecordBatchEncoder, RecordEncodeOptions, TimestampType};

use super::{to_kp_compression, API_VERSION_PRODUCE, HeaderResponse};
use crate::compression::Compression;
use crate::error::KafkaCode;
use crate::producer::{ProduceConfirm, ProducePartitionConfirm};

pub fn build_produce_request(
    correlation_id: i32,
    client_id: &str,
    required_acks: i16,
    timeout_ms: i32,
    compression: Compression,
    messages: &[(&str, i32, Option<&[u8]>, Option<&[u8]>)],
) -> (RequestHeader, ProduceRequest) {
    let header = RequestHeader::default()
        .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
        .with_request_api_key(ApiKey::Produce as i16)
        .with_request_api_version(API_VERSION_PRODUCE)
        .with_correlation_id(correlation_id);

    let mut topic_map: std::collections::HashMap<&str, std::collections::HashMap<i32, Vec<Record>>> =
        std::collections::HashMap::new();

    for (topic, partition, key, value) in messages {
        let record = Record {
            transactional: false,
            control: false,
            partition_leader_epoch: 0,
            producer_id: 0,
            producer_epoch: 0,
            timestamp_type: TimestampType::Creation,
            offset: 0,
            sequence: 0,
            timestamp: 0,
            key: key.map(bytes::Bytes::copy_from_slice),
            value: value.map(bytes::Bytes::copy_from_slice),
            headers: indexmap::IndexMap::new(),
        };
        topic_map
            .entry(topic)
            .or_default()
            .entry(*partition)
            .or_default()
            .push(record);
    }

    let topic_data: Vec<kafka_protocol::messages::produce_request::TopicProduceData> = topic_map
        .into_iter()
        .map(|(topic_name, partitions)| {
            let partition_data: Vec<kafka_protocol::messages::produce_request::PartitionProduceData> =
                partitions
                    .into_iter()
                    .map(|(partition_idx, records)| {
                        let mut buf = bytes::BytesMut::new();
                        let options = RecordEncodeOptions {
                            version: 2,
                            compression: to_kp_compression(compression),
                        };
                        RecordBatchEncoder::encode(&mut buf, &records, &options)
                            .expect("failed to encode record batch");

                        kafka_protocol::messages::produce_request::PartitionProduceData::default()
                            .with_index(partition_idx)
                            .with_records(Some(buf.freeze()))
                    })
                    .collect();

            kafka_protocol::messages::produce_request::TopicProduceData::default()
                .with_name(TopicName::from(StrBytes::from_string(topic_name.to_string())))
                .with_partition_data(partition_data)
        })
        .collect();

    let request = ProduceRequest::default()
        .with_transactional_id(None)
        .with_acks(required_acks)
        .with_timeout_ms(timeout_ms)
        .with_topic_data(topic_data);

    (header, request)
}

pub fn convert_produce_response(
    kp_resp: ProduceResponse,
    correlation_id: i32,
) -> ProduceResponseData {
    ProduceResponseData {
        header: HeaderResponse { correlation: correlation_id },
        topic_partitions: kp_resp
            .responses
            .into_iter()
            .map(|t| TopicPartitionProduceResponse {
                topic: t.name.to_string(),
                partitions: t
                    .partition_responses
                    .into_iter()
                    .map(|p| PartitionProduceResponse {
                        partition: p.index,
                        error: p.error_code,
                        offset: p.base_offset,
                    })
                    .collect(),
            })
            .collect(),
    }
}

// --------------------------------------------------------------------
// Data types (moved from old protocol/produce.rs)
// --------------------------------------------------------------------

#[allow(unused)]
#[derive(Debug, Copy, Clone)]
#[repr(u8)]
pub enum ProducerTimestamp {
    CreateTime = 0,
    LogAppendTime = 8,
}

#[derive(Default, Debug, Clone)]
pub struct ProduceResponseData {
    pub header: HeaderResponse,
    pub topic_partitions: Vec<TopicPartitionProduceResponse>,
}

#[derive(Default, Debug, Clone)]
pub struct TopicPartitionProduceResponse {
    pub topic: String,
    pub partitions: Vec<PartitionProduceResponse>,
}

#[derive(Default, Debug, Clone)]
pub struct PartitionProduceResponse {
    pub partition: i32,
    pub error: i16,
    pub offset: i64,
}

impl ProduceResponseData {
    pub fn get_response(self) -> Vec<ProduceConfirm> {
        self.topic_partitions
            .into_iter()
            .map(TopicPartitionProduceResponse::get_response)
            .collect()
    }
}

impl TopicPartitionProduceResponse {
    pub fn get_response(self) -> ProduceConfirm {
        let Self { topic, partitions } = self;
        let partition_confirms = partitions
            .iter()
            .map(PartitionProduceResponse::get_response)
            .collect();
        ProduceConfirm {
            topic,
            partition_confirms,
        }
    }
}

impl PartitionProduceResponse {
    pub fn get_response(&self) -> ProducePartitionConfirm {
        ProducePartitionConfirm {
            partition: self.partition,
            offset: match KafkaCode::from_protocol(self.error) {
                None => Ok(self.offset),
                Some(code) => Err(code),
            },
        }
    }
}