use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::PartitionId;
#[inline]
pub fn murmur2(data: &[u8]) -> u32 {
const SEED: u32 = 0x9747b28c;
const M: u32 = 0x5bd1e995;
const R: i32 = 24;
let len = data.len();
let mut h: u32 = SEED ^ (len as u32);
let mut i = 0;
while i + 4 <= len {
let mut k = u32::from_le_bytes([data[i], data[i + 1], data[i + 2], data[i + 3]]);
k = k.wrapping_mul(M);
k ^= k >> R;
k = k.wrapping_mul(M);
h = h.wrapping_mul(M);
h ^= k;
i += 4;
}
let remaining = len - i;
if remaining >= 3 {
h ^= (data[i + 2] as u32) << 16;
}
if remaining >= 2 {
h ^= (data[i + 1] as u32) << 8;
}
if remaining >= 1 {
h ^= data[i] as u32;
h = h.wrapping_mul(M);
}
h ^= h >> 13;
h = h.wrapping_mul(M);
h ^= h >> 15;
h
}
pub trait Partitioner: Send + Sync {
fn partition(&self, topic: &str, key: Option<&[u8]>, partition_count: usize) -> PartitionId;
}
#[derive(Debug)]
pub struct DefaultPartitioner {
counter: AtomicUsize,
}
impl DefaultPartitioner {
pub fn new() -> Self {
Self {
counter: AtomicUsize::new(0),
}
}
}
impl Default for DefaultPartitioner {
fn default() -> Self {
Self::new()
}
}
impl Partitioner for DefaultPartitioner {
#[inline]
fn partition(&self, _topic: &str, key: Option<&[u8]>, partition_count: usize) -> PartitionId {
if partition_count == 0 {
return 0;
}
match key {
Some(k) if !k.is_empty() => {
let hash = murmur2(k);
(hash as usize % partition_count) as PartitionId
}
_ => {
let idx = self.counter.fetch_add(1, Ordering::Relaxed);
(idx % partition_count) as PartitionId
}
}
}
}
#[derive(Debug)]
pub struct RoundRobinPartitioner {
counter: AtomicUsize,
}
impl RoundRobinPartitioner {
pub fn new() -> Self {
Self {
counter: AtomicUsize::new(0),
}
}
}
impl Default for RoundRobinPartitioner {
fn default() -> Self {
Self::new()
}
}
impl Partitioner for RoundRobinPartitioner {
#[inline]
fn partition(&self, _topic: &str, _key: Option<&[u8]>, partition_count: usize) -> PartitionId {
if partition_count == 0 {
return 0;
}
let idx = self.counter.fetch_add(1, Ordering::Relaxed);
(idx % partition_count) as PartitionId
}
}
#[derive(Debug)]
pub struct StickyPartitioner {
current: AtomicUsize,
counter: AtomicUsize,
batch_threshold: usize,
}
impl StickyPartitioner {
pub fn new() -> Self {
Self {
current: AtomicUsize::new(0),
counter: AtomicUsize::new(0),
batch_threshold: 100,
}
}
pub fn with_batch_threshold(threshold: usize) -> Self {
Self {
current: AtomicUsize::new(0),
counter: AtomicUsize::new(0),
batch_threshold: threshold.max(1),
}
}
pub fn next_partition(&self, partition_count: usize) {
if partition_count > 0 {
self.current.fetch_add(1, Ordering::AcqRel);
}
}
}
impl Default for StickyPartitioner {
fn default() -> Self {
Self::new()
}
}
impl Partitioner for StickyPartitioner {
#[inline]
fn partition(&self, _topic: &str, key: Option<&[u8]>, partition_count: usize) -> PartitionId {
if partition_count == 0 {
return 0;
}
match key {
Some(k) if !k.is_empty() => {
let hash = murmur2(k);
(hash as usize % partition_count) as PartitionId
}
_ => {
let count = self.counter.fetch_add(1, Ordering::Relaxed);
if count > 0 && count.is_multiple_of(self.batch_threshold) {
let next = count / self.batch_threshold;
self.current
.store(next % partition_count, Ordering::Release);
}
self.current.load(Ordering::Acquire) as PartitionId
}
}
}
}
#[derive(Debug, Default)]
pub struct HashPartitioner;
impl HashPartitioner {
pub fn new() -> Self {
Self
}
}
impl Partitioner for HashPartitioner {
#[inline]
fn partition(&self, _topic: &str, key: Option<&[u8]>, partition_count: usize) -> PartitionId {
if partition_count == 0 {
return 0;
}
match key {
Some(k) if !k.is_empty() => {
let mut hasher = DefaultHasher::new();
k.hash(&mut hasher);
let hash = hasher.finish();
(hash as usize % partition_count) as PartitionId
}
_ => 0, }
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_default_partitioner_with_key() {
let partitioner = DefaultPartitioner::new();
let p1 = partitioner.partition("topic", Some(b"key1"), 10);
let p2 = partitioner.partition("topic", Some(b"key1"), 10);
assert_eq!(p1, p2);
let p3 = partitioner.partition("topic", Some(b"key2"), 10);
let _ = p3; }
#[test]
fn test_default_partitioner_without_key() {
let partitioner = DefaultPartitioner::new();
let p1 = partitioner.partition("topic", None, 3);
let _p2 = partitioner.partition("topic", None, 3);
let _p3 = partitioner.partition("topic", None, 3);
let p4 = partitioner.partition("topic", None, 3);
assert_eq!(p4, p1); }
#[test]
fn test_default_partitioner_murmur2() {
let hash1 = murmur2(b"test");
let hash2 = murmur2(b"test");
assert_eq!(hash1, hash2);
let hash3 = murmur2(b"different");
assert_ne!(hash1, hash3);
}
#[test]
fn test_round_robin_partitioner() {
let partitioner = RoundRobinPartitioner::new();
let partitions: Vec<_> = (0..6)
.map(|_| partitioner.partition("topic", Some(b"key"), 3))
.collect();
assert_eq!(partitions, vec![0, 1, 2, 0, 1, 2]);
}
#[test]
fn test_sticky_partitioner() {
let partitioner = StickyPartitioner::new();
let p1 = partitioner.partition("topic", None, 3);
let p2 = partitioner.partition("topic", None, 3);
assert_eq!(p1, p2);
partitioner.next_partition(3);
let p3 = partitioner.partition("topic", None, 3);
assert_ne!(p1, p3);
}
#[test]
fn test_sticky_partitioner_auto_advance() {
let partitioner = StickyPartitioner::with_batch_threshold(5);
assert_eq!(
partitioner.batch_threshold, 5,
"with_batch_threshold should set custom threshold"
);
let partitioner_min = StickyPartitioner::with_batch_threshold(0);
assert_eq!(
partitioner_min.batch_threshold, 1,
"with_batch_threshold(0) should clamp to 1"
);
let partition_count = 3;
let initial = partitioner.partition("topic", None, partition_count);
for i in 1..5 {
let p = partitioner.partition("topic", None, partition_count);
assert_eq!(p, initial, "call {i} should still be on initial partition");
}
let after_advance = partitioner.partition("topic", None, partition_count);
assert_ne!(
after_advance, initial,
"after batch_threshold calls, partition should auto-advance to a different partition"
);
for i in 0..4 {
let p = partitioner.partition("topic", None, partition_count);
assert_eq!(
p, after_advance,
"call {i} after advance should stay on new partition"
);
}
let after_second_advance = partitioner.partition("topic", None, partition_count);
assert_ne!(
after_second_advance, after_advance,
"should auto-advance again after another batch_threshold calls"
);
}
#[test]
fn test_sticky_partitioner_with_key() {
let partitioner = StickyPartitioner::new();
let p1 = partitioner.partition("topic", Some(b"key1"), 10);
let p2 = partitioner.partition("topic", Some(b"key1"), 10);
assert_eq!(p1, p2);
}
#[test]
fn test_hash_partitioner() {
let partitioner = HashPartitioner::new();
let p1 = partitioner.partition("topic", Some(b"key"), 10);
let p2 = partitioner.partition("topic", Some(b"key"), 10);
assert_eq!(p1, p2);
let p3 = partitioner.partition("topic", None, 10);
assert_eq!(p3, 0);
}
#[test]
fn test_partitioners_with_zero_partitions() {
let default = DefaultPartitioner::new();
let round_robin = RoundRobinPartitioner::new();
let sticky = StickyPartitioner::new();
let hash = HashPartitioner::new();
assert_eq!(default.partition("topic", Some(b"key"), 0), 0);
assert_eq!(round_robin.partition("topic", Some(b"key"), 0), 0);
assert_eq!(sticky.partition("topic", Some(b"key"), 0), 0);
assert_eq!(hash.partition("topic", Some(b"key"), 0), 0);
}
}