kafkit-client 0.1.9

Kafka 4.0+ pure Rust client.
Documentation
use std::collections::{BTreeMap, HashMap};

use kafka_protocol::messages::share_acknowledge_request::AcknowledgementBatch as ShareAcknowledgeBatch;
use kafka_protocol::messages::share_fetch_request::AcknowledgementBatch as ShareFetchAcknowledgementBatch;

use super::types::{AcknowledgeType, ShareAcknowledgementCommit, TopicIdPartitionKey};

const MAX_RECORDS_WITH_SAME_ACKNOWLEDGE_TYPE: usize = 10;

#[derive(Debug, Clone, Default)]
pub(super) struct Acknowledgements {
    pub(super) offsets: BTreeMap<i64, Option<AcknowledgeType>>,
}

impl Acknowledgements {
    pub(super) fn add(&mut self, offset: i64, acknowledge_type: AcknowledgeType) {
        self.offsets.insert(offset, Some(acknowledge_type));
    }

    pub(super) fn is_empty(&self) -> bool {
        self.offsets.is_empty()
    }

    pub(super) fn extend(&mut self, acknowledgements: Acknowledgements) {
        self.offsets.extend(acknowledgements.offsets);
    }

    pub(super) fn into_share_fetch_batches(self) -> Vec<ShareFetchAcknowledgementBatch> {
        acknowledgement_batches(self.offsets)
            .into_iter()
            .map(|batch| {
                ShareFetchAcknowledgementBatch::default()
                    .with_first_offset(batch.first_offset)
                    .with_last_offset(batch.last_offset)
                    .with_acknowledge_types(batch.acknowledge_types)
            })
            .collect()
    }

    pub(super) fn into_share_acknowledge_batches(self) -> Vec<ShareAcknowledgeBatch> {
        acknowledgement_batches(self.offsets)
            .into_iter()
            .map(|batch| {
                ShareAcknowledgeBatch::default()
                    .with_first_offset(batch.first_offset)
                    .with_last_offset(batch.last_offset)
                    .with_acknowledge_types(batch.acknowledge_types)
            })
            .collect()
    }
}

struct ProtocolAcknowledgementBatch {
    first_offset: i64,
    last_offset: i64,
    acknowledge_types: Vec<i8>,
}

pub(super) fn share_acknowledgement_commits(
    acknowledgements: &HashMap<TopicIdPartitionKey, Acknowledgements>,
    error: Option<String>,
) -> Vec<ShareAcknowledgementCommit> {
    acknowledgements
        .iter()
        .filter_map(|(key, acks)| share_acknowledgement_commit(key, acks, error.clone()))
        .collect()
}

pub(super) fn share_acknowledgement_commit(
    key: &TopicIdPartitionKey,
    acks: &Acknowledgements,
    error: Option<String>,
) -> Option<ShareAcknowledgementCommit> {
    let offsets = acks.offsets.keys().copied().collect::<Vec<_>>();
    (!offsets.is_empty()).then(|| ShareAcknowledgementCommit {
        topic: key.topic.clone(),
        topic_id: key.topic_id,
        partition: key.partition,
        offsets,
        error,
    })
}

fn acknowledgement_batches(
    offsets: BTreeMap<i64, Option<AcknowledgeType>>,
) -> Vec<ProtocolAcknowledgementBatch> {
    let mut batches = Vec::new();
    let mut current: Option<ProtocolAcknowledgementBatch> = None;

    for (offset, acknowledge_type) in offsets {
        let protocol_value = acknowledge_type
            .map(AcknowledgeType::protocol_value)
            .unwrap_or(0);
        match current.as_mut() {
            Some(batch) if batch.last_offset + 1 == offset => {
                batch.last_offset = offset;
                batch.acknowledge_types.push(protocol_value);
            }
            Some(_) => {
                batches.extend(optimise_acknowledgement_batch(
                    current.take().expect("current batch exists"),
                ));
                current = Some(ProtocolAcknowledgementBatch {
                    first_offset: offset,
                    last_offset: offset,
                    acknowledge_types: vec![protocol_value],
                });
            }
            None => {
                current = Some(ProtocolAcknowledgementBatch {
                    first_offset: offset,
                    last_offset: offset,
                    acknowledge_types: vec![protocol_value],
                });
            }
        }
    }

    if let Some(batch) = current {
        batches.extend(optimise_acknowledgement_batch(batch));
    }
    batches
}

fn optimise_acknowledgement_batch(
    batch: ProtocolAcknowledgementBatch,
) -> Vec<ProtocolAcknowledgementBatch> {
    let mut batches = Vec::new();
    if batch.acknowledge_types.is_empty() {
        return batches;
    }

    let mut current_offset = batch.first_offset;
    let mut current_start_index = 0usize;
    let mut records_with_same_type = 1usize;
    let mut i = 1usize;
    while i < batch.acknowledge_types.len() {
        let acknowledge_type = batch.acknowledge_types[i];
        let previous_type = batch.acknowledge_types[i - 1];
        if acknowledge_type == previous_type
            && records_with_same_type >= MAX_RECORDS_WITH_SAME_ACKNOWLEDGE_TYPE
        {
            while i < batch.acknowledge_types.len()
                && batch.acknowledge_types[i] == batch.acknowledge_types[i - 1]
            {
                i += 1;
                records_with_same_type += 1;
            }

            let prefix_last = current_offset + i as i64
                - records_with_same_type as i64
                - current_start_index as i64
                - 1;
            if prefix_last >= current_offset {
                batches.push(ProtocolAcknowledgementBatch {
                    first_offset: current_offset,
                    last_offset: prefix_last,
                    acknowledge_types: batch.acknowledge_types
                        [current_start_index..i - records_with_same_type]
                        .to_vec(),
                });
            }

            batches.push(ProtocolAcknowledgementBatch {
                first_offset: current_offset + i as i64
                    - records_with_same_type as i64
                    - current_start_index as i64,
                last_offset: current_offset + i as i64 - current_start_index as i64 - 1,
                acknowledge_types: vec![acknowledge_type],
            });

            records_with_same_type = 1;
            current_offset += i as i64 - current_start_index as i64;
            current_start_index = i;
        } else if acknowledge_type == previous_type {
            records_with_same_type += 1;
            i += 1;
        } else {
            records_with_same_type = 1;
            i += 1;
        }
    }

    if current_start_index < batch.acknowledge_types.len() {
        batches.push(ProtocolAcknowledgementBatch {
            first_offset: current_offset,
            last_offset: current_offset + batch.acknowledge_types.len() as i64
                - current_start_index as i64
                - 1,
            acknowledge_types: batch.acknowledge_types[current_start_index..].to_vec(),
        });
    }

    batches
        .into_iter()
        .map(optimise_single_acknowledge_type)
        .collect()
}

fn optimise_single_acknowledge_type(
    batch: ProtocolAcknowledgementBatch,
) -> ProtocolAcknowledgementBatch {
    if batch.acknowledge_types.len() > 1
        && batch
            .acknowledge_types
            .iter()
            .all(|value| *value == batch.acknowledge_types[0])
    {
        ProtocolAcknowledgementBatch {
            first_offset: batch.first_offset,
            last_offset: batch.last_offset,
            acknowledge_types: vec![batch.acknowledge_types[0]],
        }
    } else {
        batch
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use uuid::Uuid;

    #[test]
    fn acknowledgement_batches_group_contiguous_offsets() {
        let mut acks = BTreeMap::new();
        acks.insert(4, Some(AcknowledgeType::Accept));
        acks.insert(5, Some(AcknowledgeType::Release));
        acks.insert(9, Some(AcknowledgeType::Reject));

        let batches = acknowledgement_batches(acks);

        assert_eq!(batches.len(), 2);
        assert_eq!(batches[0].first_offset, 4);
        assert_eq!(batches[0].last_offset, 5);
        assert_eq!(batches[0].acknowledge_types, vec![1, 2]);
        assert_eq!(batches[1].first_offset, 9);
        assert_eq!(batches[1].last_offset, 9);
        assert_eq!(batches[1].acknowledge_types, vec![3]);
    }

    #[test]
    fn acknowledgement_batches_optimise_single_type_ranges() {
        let mut acks = BTreeMap::new();
        for offset in 10..=15 {
            acks.insert(offset, Some(AcknowledgeType::Accept));
        }

        let batches = acknowledgement_batches(acks);

        assert_eq!(batches.len(), 1);
        assert_eq!(batches[0].first_offset, 10);
        assert_eq!(batches[0].last_offset, 15);
        assert_eq!(batches[0].acknowledge_types, vec![1]);
    }

    #[test]
    fn acknowledgement_batches_split_long_same_type_runs_inside_mixed_batch() {
        let mut acks = BTreeMap::new();
        acks.insert(0, Some(AcknowledgeType::Release));
        for offset in 1..=12 {
            acks.insert(offset, Some(AcknowledgeType::Accept));
        }
        acks.insert(13, Some(AcknowledgeType::Reject));

        let batches = acknowledgement_batches(acks);

        assert_eq!(batches.len(), 3);
        assert_eq!(batches[0].first_offset, 0);
        assert_eq!(batches[0].last_offset, 0);
        assert_eq!(batches[0].acknowledge_types, vec![2]);
        assert_eq!(batches[1].first_offset, 1);
        assert_eq!(batches[1].last_offset, 12);
        assert_eq!(batches[1].acknowledge_types, vec![1]);
        assert_eq!(batches[2].first_offset, 13);
        assert_eq!(batches[2].last_offset, 13);
        assert_eq!(batches[2].acknowledge_types, vec![3]);
    }

    #[test]
    fn acknowledgement_commits_copy_offsets_by_partition() {
        let topic_id = Uuid::new_v4();
        let key = TopicIdPartitionKey {
            topic_id,
            topic: "orders".to_owned(),
            partition: 2,
        };
        let mut acks = Acknowledgements::default();
        acks.add(4, AcknowledgeType::Accept);
        acks.add(7, AcknowledgeType::Release);

        let commits =
            share_acknowledgement_commits(&HashMap::from([(key, acks)]), Some("failed".to_owned()));

        assert_eq!(commits.len(), 1);
        assert_eq!(commits[0].topic, "orders");
        assert_eq!(commits[0].topic_id, topic_id);
        assert_eq!(commits[0].partition, 2);
        assert_eq!(commits[0].offsets, vec![4, 7]);
        assert_eq!(commits[0].error.as_deref(), Some("failed"));
    }
}