kafkit-client 0.1.0

Kafka 4.0+ pure Rust client.
Documentation
use std::collections::HashMap;

use crate::config::ProducerPartitioner;
use crate::metadata::PartitionMetadata;
use crate::types::{ProduceRecord, TopicPartitionKey};

use super::accumulator::RecordAccumulator;

#[derive(Default)]
pub struct DefaultPartitionerState {
    sticky_by_topic: HashMap<String, i32>,
    next_index_by_topic: HashMap<String, usize>,
}

impl DefaultPartitionerState {
    pub fn partition(
        &mut self,
        strategy: ProducerPartitioner,
        record: &ProduceRecord,
        partitions: &[(i32, PartitionMetadata)],
        accumulator: &RecordAccumulator,
        batch_size: usize,
    ) -> i32 {
        match strategy {
            ProducerPartitioner::Default => {
                self.default_partition(record, partitions, accumulator, batch_size)
            }
        }
    }

    fn default_partition(
        &mut self,
        record: &ProduceRecord,
        partitions: &[(i32, PartitionMetadata)],
        accumulator: &RecordAccumulator,
        batch_size: usize,
    ) -> i32 {
        if let Some(partition) = record.partition {
            return partition;
        }

        if let Some(key) = record.key.as_ref() {
            return keyed_partition(key, partitions);
        }

        self.sticky_partition(record, partitions, accumulator, batch_size)
    }

    fn sticky_partition(
        &mut self,
        record: &ProduceRecord,
        partitions: &[(i32, PartitionMetadata)],
        accumulator: &RecordAccumulator,
        batch_size: usize,
    ) -> i32 {
        if let Some(&sticky_partition) = self.sticky_by_topic.get(&record.topic) {
            let key = TopicPartitionKey::new(record.topic.clone(), sticky_partition);
            if accumulator.can_append_to(&key, batch_size, estimate_record_size(record)) {
                return sticky_partition;
            }
        }

        let next_partition = next_partition(
            self.next_index_by_topic
                .entry(record.topic.clone())
                .or_insert(0),
            partitions,
        );
        self.sticky_by_topic
            .insert(record.topic.clone(), next_partition);
        next_partition
    }
}

fn next_partition(next_index: &mut usize, partitions: &[(i32, PartitionMetadata)]) -> i32 {
    let index = *next_index % partitions.len();
    *next_index = next_index.saturating_add(1);
    partitions[index].0
}

fn keyed_partition(key: &[u8], partitions: &[(i32, PartitionMetadata)]) -> i32 {
    let hash = fnv1a(key);
    let index = (hash % u64::try_from(partitions.len()).unwrap_or(1)) as usize;
    partitions[index].0
}

fn fnv1a(bytes: &[u8]) -> u64 {
    let mut hash = 0xcbf29ce484222325u64;
    for byte in bytes {
        hash ^= u64::from(*byte);
        hash = hash.wrapping_mul(0x100000001b3);
    }
    hash
}

fn estimate_record_size(record: &ProduceRecord) -> usize {
    let key_bytes = record.key.as_ref().map(|key| key.len()).unwrap_or(0);
    let header_bytes = record
        .headers
        .iter()
        .map(|header| {
            header.key.len() + header.value.as_ref().map(|value| value.len()).unwrap_or(0)
        })
        .sum::<usize>();
    record.topic.len()
        + key_bytes
        + record.value.as_ref().map(|value| value.len()).unwrap_or(0)
        + header_bytes
        + 64
}

#[cfg(test)]
mod tests {
    use tokio::sync::oneshot;

    use super::*;
    use crate::config::ProducerPartitioner;
    use crate::metadata::PartitionMetadata;

    fn partitions() -> Vec<(i32, PartitionMetadata)> {
        vec![
            (
                0,
                PartitionMetadata {
                    leader_id: 1,
                    leader_epoch: 1,
                    replica_nodes: vec![1],
                    isr_nodes: vec![1],
                    offline_replicas: vec![],
                },
            ),
            (
                1,
                PartitionMetadata {
                    leader_id: 1,
                    leader_epoch: 1,
                    replica_nodes: vec![1],
                    isr_nodes: vec![1],
                    offline_replicas: vec![],
                },
            ),
            (
                2,
                PartitionMetadata {
                    leader_id: 1,
                    leader_epoch: 1,
                    replica_nodes: vec![1],
                    isr_nodes: vec![1],
                    offline_replicas: vec![],
                },
            ),
        ]
    }

    #[test]
    fn keyed_records_pick_a_deterministic_partition() {
        let partitions = partitions();
        let mut partitioner = DefaultPartitionerState::default();
        let accumulator = RecordAccumulator::default();
        let record =
            ProduceRecord::without_partition("topic-a", b"value".as_slice()).with_key("stable-key");

        let first = partitioner.partition(
            ProducerPartitioner::Default,
            &record,
            &partitions,
            &accumulator,
            1024,
        );
        let second = partitioner.partition(
            ProducerPartitioner::Default,
            &record,
            &partitions,
            &accumulator,
            1024,
        );

        assert_eq!(first, second);
    }

    #[test]
    fn unkeyed_records_stick_until_the_batch_is_full() {
        let partitions = partitions();
        let mut partitioner = DefaultPartitionerState::default();
        let mut accumulator = RecordAccumulator::default();
        let first = ProduceRecord::without_partition("topic-a", b"one".as_slice());
        let second = ProduceRecord::without_partition("topic-a", b"two".as_slice());
        let third = ProduceRecord::without_partition("topic-a", vec![b'x'; 4096]);

        let first_partition = partitioner.partition(
            ProducerPartitioner::Default,
            &first,
            &partitions,
            &accumulator,
            1024,
        );
        accumulator.append(
            first.with_partition(first_partition),
            oneshot::channel().0,
            1024,
        );

        let second_partition = partitioner.partition(
            ProducerPartitioner::Default,
            &second,
            &partitions,
            &accumulator,
            1024,
        );
        assert_eq!(first_partition, second_partition);
        accumulator.append(
            second.with_partition(second_partition),
            oneshot::channel().0,
            1024,
        );

        let third_partition = partitioner.partition(
            ProducerPartitioner::Default,
            &third,
            &partitions,
            &accumulator,
            1024,
        );
        assert_ne!(first_partition, third_partition);
    }
}