pub fn murmur2(data: &[u8]) -> u32 {
const SEED: u32 = 0x9747b28c;
const M: u32 = 0x5bd1e995;
const R: u32 = 24;
let len = data.len();
let mut h: u32 = SEED ^ (len as u32);
let mut i = 0;
while i + 4 <= len {
let mut k = u32::from_le_bytes([data[i], data[i + 1], data[i + 2], data[i + 3]]);
k = k.wrapping_mul(M);
k ^= k >> R;
k = k.wrapping_mul(M);
h = h.wrapping_mul(M);
h ^= k;
i += 4;
}
let remainder = len - i;
if remainder >= 3 {
h ^= (data[i + 2] as u32) << 16;
}
if remainder >= 2 {
h ^= (data[i + 1] as u32) << 8;
}
if remainder >= 1 {
h ^= data[i] as u32;
h = h.wrapping_mul(M);
}
h ^= h >> 13;
h = h.wrapping_mul(M);
h ^= h >> 15;
h
}
#[inline]
pub fn murmur2_partition(key: &[u8], num_partitions: u32) -> u32 {
(murmur2(key) & 0x7fffffff) % num_partitions
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_murmur2_known_vectors() {
assert_eq!(murmur2(b""), 275646681);
assert_eq!(murmur2(b"hello"), 2132663229);
assert_eq!(murmur2(b"kafka"), 3496464228);
}
#[test]
fn test_murmur2_partition_deterministic() {
let key = b"user-123";
let p1 = murmur2_partition(key, 10);
let p2 = murmur2_partition(key, 10);
assert_eq!(p1, p2);
assert!(p1 < 10);
}
#[test]
fn test_murmur2_partition_distribution() {
let mut counts = [0u32; 8];
for i in 0..1000u32 {
let key = i.to_be_bytes();
let p = murmur2_partition(&key, 8);
counts[p as usize] += 1;
}
for count in &counts {
assert!(*count > 0, "Partition got zero keys — bad distribution");
}
}
}