use std::collections::HashMap;
use crate::config::ProducerPartitioner;
use crate::metadata::PartitionMetadata;
use crate::types::{ProduceRecord, TopicPartitionKey};
use super::accumulator::RecordAccumulator;
#[derive(Default)]
pub struct DefaultPartitionerState {
sticky_by_topic: HashMap<String, i32>,
next_index_by_topic: HashMap<String, usize>,
}
impl DefaultPartitionerState {
pub fn partition(
&mut self,
strategy: ProducerPartitioner,
record: &ProduceRecord,
partitions: &[(i32, PartitionMetadata)],
accumulator: &RecordAccumulator,
batch_size: usize,
ignore_keys: bool,
) -> i32 {
match strategy {
ProducerPartitioner::Default => {
self.default_partition(record, partitions, accumulator, batch_size, ignore_keys)
}
}
}
fn default_partition(
&mut self,
record: &ProduceRecord,
partitions: &[(i32, PartitionMetadata)],
accumulator: &RecordAccumulator,
batch_size: usize,
ignore_keys: bool,
) -> i32 {
if let Some(partition) = record.partition {
return partition;
}
if !ignore_keys && let Some(key) = record.key.as_ref() {
return keyed_partition(key, partitions);
}
self.sticky_partition(record, partitions, accumulator, batch_size)
}
fn sticky_partition(
&mut self,
record: &ProduceRecord,
partitions: &[(i32, PartitionMetadata)],
accumulator: &RecordAccumulator,
batch_size: usize,
) -> i32 {
if let Some(&sticky_partition) = self.sticky_by_topic.get(&record.topic) {
let key = TopicPartitionKey::new(record.topic.clone(), sticky_partition);
if accumulator.can_append_to(&key, batch_size, estimate_record_size(record)) {
return sticky_partition;
}
}
let next_partition = next_partition(
self.next_index_by_topic
.entry(record.topic.clone())
.or_insert(0),
partitions,
);
self.sticky_by_topic
.insert(record.topic.clone(), next_partition);
next_partition
}
}
fn next_partition(next_index: &mut usize, partitions: &[(i32, PartitionMetadata)]) -> i32 {
let index = *next_index % partitions.len();
*next_index = next_index.saturating_add(1);
partitions[index].0
}
fn keyed_partition(key: &[u8], partitions: &[(i32, PartitionMetadata)]) -> i32 {
let hash = fnv1a(key);
let index = (hash % u64::try_from(partitions.len()).unwrap_or(1)) as usize;
partitions[index].0
}
fn fnv1a(bytes: &[u8]) -> u64 {
let mut hash = 0xcbf29ce484222325u64;
for byte in bytes {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(0x100000001b3);
}
hash
}
fn estimate_record_size(record: &ProduceRecord) -> usize {
let key_bytes = record.key.as_ref().map(|key| key.len()).unwrap_or(0);
let header_bytes = record
.headers
.iter()
.map(|header| {
header.key.len() + header.value.as_ref().map(|value| value.len()).unwrap_or(0)
})
.sum::<usize>();
record.topic.len()
+ key_bytes
+ record.value.as_ref().map(|value| value.len()).unwrap_or(0)
+ header_bytes
+ 64
}
#[cfg(test)]
mod tests {
use tokio::sync::oneshot;
use super::*;
use crate::config::ProducerPartitioner;
use crate::metadata::PartitionMetadata;
fn partitions() -> Vec<(i32, PartitionMetadata)> {
vec![
(
0,
PartitionMetadata {
leader_id: 1,
leader_epoch: 1,
replica_nodes: vec![1],
isr_nodes: vec![1],
offline_replicas: vec![],
},
),
(
1,
PartitionMetadata {
leader_id: 1,
leader_epoch: 1,
replica_nodes: vec![1],
isr_nodes: vec![1],
offline_replicas: vec![],
},
),
(
2,
PartitionMetadata {
leader_id: 1,
leader_epoch: 1,
replica_nodes: vec![1],
isr_nodes: vec![1],
offline_replicas: vec![],
},
),
]
}
#[test]
fn keyed_records_pick_a_deterministic_partition() {
let partitions = partitions();
let mut partitioner = DefaultPartitionerState::default();
let accumulator = RecordAccumulator::default();
let record =
ProduceRecord::without_partition("topic-a", b"value".as_slice()).with_key("stable-key");
let first = partitioner.partition(
ProducerPartitioner::Default,
&record,
&partitions,
&accumulator,
1024,
false,
);
let second = partitioner.partition(
ProducerPartitioner::Default,
&record,
&partitions,
&accumulator,
1024,
false,
);
assert_eq!(first, second);
}
#[test]
fn unkeyed_records_stick_until_the_batch_is_full() {
let partitions = partitions();
let mut partitioner = DefaultPartitionerState::default();
let mut accumulator = RecordAccumulator::default();
let first = ProduceRecord::without_partition("topic-a", b"one".as_slice());
let second = ProduceRecord::without_partition("topic-a", b"two".as_slice());
let third = ProduceRecord::without_partition("topic-a", vec![b'x'; 4096]);
let first_partition = partitioner.partition(
ProducerPartitioner::Default,
&first,
&partitions,
&accumulator,
1024,
false,
);
accumulator.append(
first.with_partition(first_partition),
oneshot::channel().0,
1024,
);
let second_partition = partitioner.partition(
ProducerPartitioner::Default,
&second,
&partitions,
&accumulator,
1024,
false,
);
assert_eq!(first_partition, second_partition);
accumulator.append(
second.with_partition(second_partition),
oneshot::channel().0,
1024,
);
let third_partition = partitioner.partition(
ProducerPartitioner::Default,
&third,
&partitions,
&accumulator,
1024,
false,
);
assert_ne!(first_partition, third_partition);
}
#[test]
fn ignore_keys_uses_sticky_partitioning_for_keyed_records() {
let partitions = partitions();
let mut partitioner = DefaultPartitionerState::default();
let accumulator = RecordAccumulator::default();
let record =
ProduceRecord::without_partition("topic-a", b"value".as_slice()).with_key("stable-key");
let keyed = partitioner.partition(
ProducerPartitioner::Default,
&record,
&partitions,
&accumulator,
1024,
false,
);
let ignored = partitioner.partition(
ProducerPartitioner::Default,
&record,
&partitions,
&accumulator,
1024,
true,
);
assert_eq!(
keyed,
keyed_partition(record.key.as_deref().unwrap(), &partitions)
);
assert_eq!(ignored, partitions[0].0);
}
}