crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
//! `murmur2(transactional_id) % num_partitions` — Apache Kafka's
//! `Utils.abs(murmur2(...)) % numPartitions` convention. Matches the
//! JVM client so a tid hashes to the same `__transaction_state`
//! partition on Crabka as it does on Apache Kafka.

// The constants and murmur2 helper are retained for transaction-state
// partition routing parity with Apache Kafka.
#![allow(dead_code)]

const SEED: u32 = 0x9747_b28c;
const M: u32 = 0x5bd1_e995;
const R: u32 = 24;

// Intentional truncation: murmur2 operates on the low 32 bits of the
// length, matching the JVM int-cast semantics.
#[allow(clippy::cast_possible_truncation)]
fn murmur2(data: &[u8]) -> u32 {
    let length = data.len();
    let mut h: u32 = SEED ^ (length as u32);
    let chunks = data.chunks_exact(4);
    let rem = chunks.remainder();
    for chunk in chunks {
        let mut k = u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
        k = k.wrapping_mul(M);
        k ^= k >> R;
        k = k.wrapping_mul(M);
        h = h.wrapping_mul(M);
        h ^= k;
    }
    match rem.len() {
        3 => {
            h ^= u32::from(rem[2]) << 16;
            h ^= u32::from(rem[1]) << 8;
            h ^= u32::from(rem[0]);
            h = h.wrapping_mul(M);
        }
        2 => {
            h ^= u32::from(rem[1]) << 8;
            h ^= u32::from(rem[0]);
            h = h.wrapping_mul(M);
        }
        1 => {
            h ^= u32::from(rem[0]);
            h = h.wrapping_mul(M);
        }
        _ => {}
    }
    h ^= h >> 13;
    h = h.wrapping_mul(M);
    h ^= h >> 15;
    h
}

/// Map a `transactional_id` to a partition index in
/// `__transaction_state`. Uses `i32`-cast then `abs` to match the JVM
/// `Utils.abs(int)` semantics, which returns 0 for `Integer.MIN_VALUE`
/// to avoid arithmetic overflow.
pub fn partition_for_tid(transactional_id: &str, num_partitions: i32) -> i32 {
    // cast_possible_wrap: intentional — mirrors JVM's (int) cast of the u32 hash.
    let h = murmur2(transactional_id.as_bytes()).cast_signed();
    // Match Utils.abs: return 0 for i32::MIN to avoid overflow, else Math.abs.
    let abs = if h == i32::MIN { 0 } else { h.abs() };
    abs % num_partitions
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;

    // Reference vectors generated from the canonical JVM implementation:
    //   Utils.abs(Utils.murmur2(tid.getBytes(StandardCharsets.UTF_8))) % 50
    // where Utils is org.apache.kafka.common.utils.Utils.
    #[test]
    fn matches_jvm_for_canonical_tids() {
        let cases: &[(&str, i32)] = &[("my-tid", 43), ("producer-1", 45), ("tx-orders-prod", 26)];
        for (tid, expected) in cases {
            assert!(
                partition_for_tid(tid, 50) == *expected,
                "tid `{tid}` should hash to partition {expected}"
            );
        }
    }

    #[test]
    fn always_in_bounds() {
        for s in [
            "",
            "a",
            "really-long-transactional-id-with-many-bytes-and-symbols-!@#$%",
        ] {
            for n in [1, 50, 256] {
                let p = partition_for_tid(s, n);
                assert!((0..n).contains(&p));
            }
        }
    }

    #[test]
    fn min_value_input_does_not_break_bounds() {
        // Sanity test verifying the i32::MIN guard in Utils.abs semantics.
        // We can't easily construct a tid that murmur2's to exactly i32::MIN,
        // but verify partition_for_tid never returns negative for a diverse set of inputs.
        let long_repeated = "x".repeat(64);
        let inputs: &[&str] = &[
            "",
            "a",
            "tid",
            "transactional-id-123",
            &long_repeated,
            "00000000",
            "1111111111111111",
            "deadbeef",
            "very-long-string-that-might-trigger-edge-cases-in-murmur2-mixing",
        ];
        for s in inputs {
            for num_partitions in [1, 3, 50, 256] {
                let p = partition_for_tid(s, num_partitions);
                assert!(
                    (0..num_partitions).contains(&p),
                    "tid={s:?}, num_partitions={num_partitions} produced p={p} (out of bounds)"
                );
            }
        }
    }
}