#![allow(dead_code)]
const SEED: u32 = 0x9747_b28c;
const M: u32 = 0x5bd1_e995;
const R: u32 = 24;
#[allow(clippy::cast_possible_truncation)]
fn murmur2(data: &[u8]) -> u32 {
let length = data.len();
let mut h: u32 = SEED ^ (length as u32);
let chunks = data.chunks_exact(4);
let rem = chunks.remainder();
for chunk in chunks {
let mut k = u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
k = k.wrapping_mul(M);
k ^= k >> R;
k = k.wrapping_mul(M);
h = h.wrapping_mul(M);
h ^= k;
}
match rem.len() {
3 => {
h ^= u32::from(rem[2]) << 16;
h ^= u32::from(rem[1]) << 8;
h ^= u32::from(rem[0]);
h = h.wrapping_mul(M);
}
2 => {
h ^= u32::from(rem[1]) << 8;
h ^= u32::from(rem[0]);
h = h.wrapping_mul(M);
}
1 => {
h ^= u32::from(rem[0]);
h = h.wrapping_mul(M);
}
_ => {}
}
h ^= h >> 13;
h = h.wrapping_mul(M);
h ^= h >> 15;
h
}
pub fn partition_for_tid(transactional_id: &str, num_partitions: i32) -> i32 {
let h = murmur2(transactional_id.as_bytes()).cast_signed();
let abs = if h == i32::MIN { 0 } else { h.abs() };
abs % num_partitions
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn matches_jvm_for_canonical_tids() {
let cases: &[(&str, i32)] = &[("my-tid", 43), ("producer-1", 45), ("tx-orders-prod", 26)];
for (tid, expected) in cases {
assert!(
partition_for_tid(tid, 50) == *expected,
"tid `{tid}` should hash to partition {expected}"
);
}
}
#[test]
fn always_in_bounds() {
for s in [
"",
"a",
"really-long-transactional-id-with-many-bytes-and-symbols-!@#$%",
] {
for n in [1, 50, 256] {
let p = partition_for_tid(s, n);
assert!((0..n).contains(&p));
}
}
}
#[test]
fn min_value_input_does_not_break_bounds() {
let long_repeated = "x".repeat(64);
let inputs: &[&str] = &[
"",
"a",
"tid",
"transactional-id-123",
&long_repeated,
"00000000",
"1111111111111111",
"deadbeef",
"very-long-string-that-might-trigger-edge-cases-in-murmur2-mixing",
];
for s in inputs {
for num_partitions in [1, 3, 50, 256] {
let p = partition_for_tid(s, num_partitions);
assert!(
(0..num_partitions).contains(&p),
"tid={s:?}, num_partitions={num_partitions} produced p={p} (out of bounds)"
);
}
}
}
}