use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use parking_lot::Mutex;
use crate::PartitionId;
#[inline]
pub fn murmur2(data: &[u8]) -> u32 {
const SEED: u32 = 0x9747b28c;
const M: u32 = 0x5bd1e995;
const R: i32 = 24;
let len = data.len();
let mut h: u32 = SEED ^ (len as u32);
let mut i = 0;
while i + 4 <= len {
let mut k = u32::from_le_bytes([data[i], data[i + 1], data[i + 2], data[i + 3]]);
k = k.wrapping_mul(M);
k ^= k >> R;
k = k.wrapping_mul(M);
h = h.wrapping_mul(M);
h ^= k;
i += 4;
}
let remaining = len - i;
if remaining >= 3 {
h ^= (data[i + 2] as u32) << 16;
}
if remaining >= 2 {
h ^= (data[i + 1] as u32) << 8;
}
if remaining >= 1 {
h ^= data[i] as u32;
h = h.wrapping_mul(M);
}
h ^= h >> 13;
h = h.wrapping_mul(M);
h ^= h >> 15;
h
}
#[inline]
fn partition_for_key(key: &[u8], partition_count: usize) -> PartitionId {
(((murmur2(key) & 0x7fff_ffff) as usize) % partition_count) as PartitionId
}
#[inline]
fn random_partition(partition_count: usize) -> i32 {
debug_assert!(partition_count > 0);
rand::random_range(0..partition_count as u32) as i32
}
pub trait Partitioner: Send + Sync {
fn partition(&self, topic: &str, key: Option<&[u8]>, partition_count: usize) -> PartitionId;
#[inline]
fn on_new_batch(&self, _topic: &str, _prev_partition: PartitionId, _partition_count: usize) {}
}
#[derive(Debug)]
pub struct DefaultPartitioner {
per_topic_counter: Mutex<HashMap<String, usize>>,
}
impl DefaultPartitioner {
pub fn new() -> Self {
Self {
per_topic_counter: Mutex::new(HashMap::new()),
}
}
}
impl Default for DefaultPartitioner {
fn default() -> Self {
Self::new()
}
}
impl Partitioner for DefaultPartitioner {
#[inline]
fn partition(&self, topic: &str, key: Option<&[u8]>, partition_count: usize) -> PartitionId {
if partition_count == 0 {
return 0;
}
match key {
Some(k) if !k.is_empty() => {
partition_for_key(k, partition_count)
}
_ => {
let mut counters = self.per_topic_counter.lock();
let counter = counters.entry(topic.to_owned()).or_insert(0);
let idx = *counter;
*counter = counter.wrapping_add(1);
(idx % partition_count) as PartitionId
}
}
}
}
#[derive(Debug)]
pub struct RoundRobinPartitioner {
counter: AtomicUsize,
}
impl RoundRobinPartitioner {
pub fn new() -> Self {
Self {
counter: AtomicUsize::new(0),
}
}
}
impl Default for RoundRobinPartitioner {
fn default() -> Self {
Self::new()
}
}
impl Partitioner for RoundRobinPartitioner {
#[inline]
fn partition(&self, _topic: &str, _key: Option<&[u8]>, partition_count: usize) -> PartitionId {
if partition_count == 0 {
return 0;
}
let idx = self.counter.fetch_add(1, Ordering::Relaxed);
(idx % partition_count) as PartitionId
}
}
#[derive(Debug)]
pub struct StickyPartitioner {
current: AtomicUsize,
counter: AtomicUsize,
batch_threshold: usize,
}
impl StickyPartitioner {
pub fn new() -> Self {
Self {
current: AtomicUsize::new(0),
counter: AtomicUsize::new(0),
batch_threshold: 100,
}
}
pub fn with_batch_threshold(threshold: usize) -> Self {
Self {
current: AtomicUsize::new(0),
counter: AtomicUsize::new(0),
batch_threshold: threshold.max(1),
}
}
pub fn next_partition(&self, partition_count: usize) {
if partition_count > 0 {
self.current.fetch_add(1, Ordering::AcqRel);
}
}
}
impl Default for StickyPartitioner {
fn default() -> Self {
Self::new()
}
}
impl Partitioner for StickyPartitioner {
#[inline]
fn partition(&self, _topic: &str, key: Option<&[u8]>, partition_count: usize) -> PartitionId {
if partition_count == 0 {
return 0;
}
match key {
Some(k) if !k.is_empty() => {
partition_for_key(k, partition_count)
}
_ => {
let count = self.counter.fetch_add(1, Ordering::Relaxed);
if count > 0 && count.is_multiple_of(self.batch_threshold) {
let next = count / self.batch_threshold;
self.current
.store(next % partition_count, Ordering::Release);
}
self.current.load(Ordering::Acquire) as PartitionId
}
}
}
}
#[derive(Debug, Default)]
pub struct HashPartitioner;
impl HashPartitioner {
pub fn new() -> Self {
Self
}
}
impl Partitioner for HashPartitioner {
#[inline]
fn partition(&self, _topic: &str, key: Option<&[u8]>, partition_count: usize) -> PartitionId {
if partition_count == 0 {
return 0;
}
match key {
Some(k) if !k.is_empty() => {
partition_for_key(k, partition_count)
}
_ => 0,
}
}
}
#[derive(Debug, Default)]
pub struct UniformStickyPartitioner {
sticky: Mutex<HashMap<String, i32>>,
}
impl UniformStickyPartitioner {
pub const MAX_TRACKED_TOPICS: usize = 10_000;
pub fn new() -> Self {
Self::default()
}
fn pick_new_partition(partition_count: usize, avoid: i32) -> i32 {
if partition_count <= 1 {
return 0;
}
let candidate = random_partition(partition_count);
if candidate == avoid {
(candidate + 1) % partition_count as i32
} else {
candidate
}
}
}
impl Partitioner for UniformStickyPartitioner {
#[inline]
fn partition(&self, topic: &str, key: Option<&[u8]>, partition_count: usize) -> PartitionId {
if partition_count == 0 {
return 0;
}
if let Some(k) = key
&& !k.is_empty()
{
return partition_for_key(k, partition_count);
}
let mut map = self.sticky.lock();
let mut partition = if let Some(existing) = map.get_mut(topic) {
*existing
} else {
if map.len() >= Self::MAX_TRACKED_TOPICS {
if let Some(evict_key) = map.keys().next().map(|k| k.to_owned()) {
map.remove(&evict_key);
}
}
let fresh = random_partition(partition_count);
map.insert(topic.to_string(), fresh);
fresh
};
if (partition as usize) >= partition_count {
partition = random_partition(partition_count);
map.insert(topic.to_string(), partition);
}
partition
}
fn on_new_batch(&self, topic: &str, prev_partition: PartitionId, partition_count: usize) {
if partition_count == 0 {
return;
}
let next = Self::pick_new_partition(partition_count, prev_partition);
let mut map = self.sticky.lock();
if let Some(current) = map.get_mut(topic) {
if *current == prev_partition {
*current = next;
}
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_default_partitioner_with_key() {
let partitioner = DefaultPartitioner::new();
let p1 = partitioner.partition("topic", Some(b"key1"), 10);
let p2 = partitioner.partition("topic", Some(b"key1"), 10);
assert_eq!(p1, p2);
let p3 = partitioner.partition("topic", Some(b"key2"), 10);
let _ = p3; }
#[test]
fn test_default_partitioner_without_key() {
let partitioner = DefaultPartitioner::new();
let p1 = partitioner.partition("topic", None, 3);
let _p2 = partitioner.partition("topic", None, 3);
let _p3 = partitioner.partition("topic", None, 3);
let p4 = partitioner.partition("topic", None, 3);
assert_eq!(p4, p1); }
#[test]
fn test_default_partitioner_murmur2() {
let hash1 = murmur2(b"test");
let hash2 = murmur2(b"test");
assert_eq!(hash1, hash2);
let hash3 = murmur2(b"different");
assert_ne!(hash1, hash3);
}
#[test]
fn test_round_robin_partitioner() {
let partitioner = RoundRobinPartitioner::new();
let partitions: Vec<_> = (0..6)
.map(|_| partitioner.partition("topic", Some(b"key"), 3))
.collect();
assert_eq!(partitions, vec![0, 1, 2, 0, 1, 2]);
}
#[test]
fn test_sticky_partitioner() {
let partitioner = StickyPartitioner::new();
let p1 = partitioner.partition("topic", None, 3);
let p2 = partitioner.partition("topic", None, 3);
assert_eq!(p1, p2);
partitioner.next_partition(3);
let p3 = partitioner.partition("topic", None, 3);
assert_ne!(p1, p3);
}
#[test]
fn test_sticky_partitioner_auto_advance() {
let partitioner = StickyPartitioner::with_batch_threshold(5);
assert_eq!(
partitioner.batch_threshold, 5,
"with_batch_threshold should set custom threshold"
);
let partitioner_min = StickyPartitioner::with_batch_threshold(0);
assert_eq!(
partitioner_min.batch_threshold, 1,
"with_batch_threshold(0) should clamp to 1"
);
let partition_count = 3;
let initial = partitioner.partition("topic", None, partition_count);
for i in 1..5 {
let p = partitioner.partition("topic", None, partition_count);
assert_eq!(p, initial, "call {i} should still be on initial partition");
}
let after_advance = partitioner.partition("topic", None, partition_count);
assert_ne!(
after_advance, initial,
"after batch_threshold calls, partition should auto-advance to a different partition"
);
for i in 0..4 {
let p = partitioner.partition("topic", None, partition_count);
assert_eq!(
p, after_advance,
"call {i} after advance should stay on new partition"
);
}
let after_second_advance = partitioner.partition("topic", None, partition_count);
assert_ne!(
after_second_advance, after_advance,
"should auto-advance again after another batch_threshold calls"
);
}
#[test]
fn test_sticky_partitioner_with_key() {
let partitioner = StickyPartitioner::new();
let p1 = partitioner.partition("topic", Some(b"key1"), 10);
let p2 = partitioner.partition("topic", Some(b"key1"), 10);
assert_eq!(p1, p2);
}
#[test]
fn test_hash_partitioner() {
let partitioner = HashPartitioner::new();
let p1 = partitioner.partition("topic", Some(b"key"), 10);
let p2 = partitioner.partition("topic", Some(b"key"), 10);
assert_eq!(p1, p2);
let p3 = partitioner.partition("topic", None, 10);
assert_eq!(p3, 0);
}
#[test]
fn test_partitioners_with_zero_partitions() {
let default = DefaultPartitioner::new();
let round_robin = RoundRobinPartitioner::new();
let sticky = StickyPartitioner::new();
let hash = HashPartitioner::new();
let uniform = UniformStickyPartitioner::new();
assert_eq!(default.partition("topic", Some(b"key"), 0), 0);
assert_eq!(round_robin.partition("topic", Some(b"key"), 0), 0);
assert_eq!(sticky.partition("topic", Some(b"key"), 0), 0);
assert_eq!(hash.partition("topic", Some(b"key"), 0), 0);
assert_eq!(uniform.partition("topic", Some(b"key"), 0), 0);
assert_eq!(uniform.partition("topic", None, 0), 0);
}
#[test]
fn test_uniform_sticky_partitioner_basic() {
let p = UniformStickyPartitioner::new();
let first = p.partition("topic", None, 8);
assert!(first < 8);
for _ in 0..20 {
assert_eq!(p.partition("topic", None, 8), first);
}
let other = p.partition("other-topic", None, 8);
assert!(other < 8);
}
#[test]
fn test_uniform_sticky_partitioner_keyed() {
let p = UniformStickyPartitioner::new();
let k1a = p.partition("topic", Some(b"key1"), 8);
let k1b = p.partition("topic", Some(b"key1"), 8);
assert_eq!(k1a, k1b);
let k2 = p.partition("topic", Some(b"key2"), 8);
assert!(k2 < 8);
}
#[test]
fn test_uniform_sticky_on_new_batch() {
let p = UniformStickyPartitioner::new();
let prev = p.partition("topic", None, 8);
p.on_new_batch("topic", prev, 8);
let next = p.partition("topic", None, 8);
assert_ne!(next, prev, "sticky should advance after on_new_batch");
assert!(next < 8);
p.on_new_batch("topic", prev, 8);
assert_eq!(p.partition("topic", None, 8), next);
}
#[test]
fn test_uniform_sticky_partition_count_shrink() {
let p = UniformStickyPartitioner::new();
let _ = p.partition("topic", None, 64);
let result = p.partition("topic", None, 1);
assert_eq!(result, 0);
}
#[test]
fn test_uniform_sticky_single_partition() {
let p = UniformStickyPartitioner::new();
let result = p.partition("topic", None, 1);
assert_eq!(result, 0);
p.on_new_batch("topic", 0, 1);
assert_eq!(p.partition("topic", None, 1), 0);
}
#[test]
fn test_uniform_sticky_on_new_batch_unknown_topic() {
let p = UniformStickyPartitioner::new();
p.on_new_batch("unknown", 0, 4);
let result = p.partition("unknown", None, 4);
assert!(result < 4);
}
#[test]
fn test_uniform_sticky_concurrent_safety() {
use std::sync::Arc;
use std::thread;
let p = Arc::new(UniformStickyPartitioner::new());
let mut handles = Vec::new();
for _ in 0..8 {
let p = Arc::clone(&p);
handles.push(thread::spawn(move || {
for _ in 0..1000 {
let part = p.partition("topic", None, 16);
assert!(part < 16, "got out-of-range partition {part}");
p.on_new_batch("topic", part, 16);
}
}));
}
for h in handles {
h.join().expect("thread panicked");
}
}
#[test]
fn murmur2_java_compatibility() {
assert_eq!(murmur2(b"abc"), 0x1c94_221b, "murmur2(b\"abc\") mismatch");
assert_eq!(murmur2(b""), 0x106e_08d9, "murmur2(b\"\") mismatch");
assert_eq!(murmur2(b"21"), 0xc5f2_f8ec, "murmur2(b\"21\") mismatch");
assert_eq!(
murmur2(b"foobar"),
0xd0e4_7bbe,
"murmur2(b\"foobar\") mismatch"
);
assert_eq!(
murmur2(b"a-little-bit-of-whatever"),
0x5795_e613,
"murmur2(b\"a-little-bit-of-whatever\") mismatch",
);
}
}