use std::hash::Hasher;
use std::sync::Arc;
use twox_hash::XxHash64;
const SHARD_HASH_SEED: u64 = 0;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ShardConfig {
pub shard_index: u32,
pub shard_count: u32,
}
impl Default for ShardConfig {
fn default() -> Self {
Self {
shard_index: 0,
shard_count: 1,
}
}
}
impl ShardConfig {
pub fn new(shard_index: u32, shard_count: u32) -> Result<Self, ShardConfigError> {
if shard_count == 0 {
return Err(ShardConfigError::ZeroShardCount);
}
if shard_index >= shard_count {
return Err(ShardConfigError::IndexOutOfRange {
shard_index,
shard_count,
});
}
Ok(Self {
shard_index,
shard_count,
})
}
pub fn into_arc(self) -> Arc<Self> {
Arc::new(self)
}
pub fn owns(&self, execution_id: i64) -> bool {
if self.shard_count <= 1 {
return true;
}
shard_for(execution_id, self.shard_count) == self.shard_index
}
}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum ShardConfigError {
#[error("NOETL_SHARD_COUNT must be >= 1; got 0")]
ZeroShardCount,
#[error(
"NOETL_SHARD_INDEX {shard_index} >= NOETL_SHARD_COUNT {shard_count}; \
shard_index must be in 0..shard_count"
)]
IndexOutOfRange { shard_index: u32, shard_count: u32 },
}
pub fn shard_for(execution_id: i64, shard_count: u32) -> u32 {
if shard_count <= 1 {
return 0;
}
let mut h = XxHash64::with_seed(SHARD_HASH_SEED);
h.write(&execution_id.to_le_bytes());
(h.finish() % shard_count as u64) as u32
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn shard_for_is_stable_across_calls() {
let cases: &[(i64, u32, u32)] = &[
(1, 16, shard_for(1, 16)),
(320816801799737344, 16, shard_for(320816801799737344, 16)),
(i64::MAX, 16, shard_for(i64::MAX, 16)),
];
for (eid, n, expected) in cases {
for _ in 0..100 {
assert_eq!(shard_for(*eid, *n), *expected);
}
}
}
#[test]
fn shard_for_distributes_evenly_across_16_shards() {
const N: u32 = 16;
const TOTAL: usize = 10_000;
let base = 320_816_801_799_737_344_i64;
let mut counts = [0_usize; N as usize];
for i in 0..TOTAL {
let eid = base + (i as i64);
let shard = shard_for(eid, N) as usize;
counts[shard] += 1;
}
let mean = TOTAL / N as usize;
let tolerance = mean / 5; let lo = mean - tolerance;
let hi = mean + tolerance;
for (i, c) in counts.iter().enumerate() {
assert!(
*c >= lo && *c <= hi,
"shard {i} count {c} outside [{lo}, {hi}] (mean {mean}); distribution is biased"
);
}
}
#[test]
fn shard_for_handles_negative_execution_ids() {
for eid in [-1_i64, i64::MIN, i64::MIN + 1, -42] {
for n in [1, 4, 16, 1024] {
let shard = shard_for(eid, n);
assert!(shard < n, "shard {shard} >= shard_count {n} for eid={eid}");
}
}
}
#[test]
fn shard_for_one_shard_returns_zero() {
for eid in [0, 1, i64::MAX, -1, i64::MIN] {
assert_eq!(shard_for(eid, 1), 0);
}
}
#[test]
fn shard_for_zero_shards_returns_zero() {
assert_eq!(shard_for(42, 0), 0);
}
#[test]
fn owns_is_true_when_shard_count_is_one() {
let cfg = ShardConfig::default();
assert_eq!(cfg.shard_count, 1);
for eid in [0, 1, 320_816_801_799_737_344, i64::MAX, -1] {
assert!(cfg.owns(eid), "owns({eid}) should be true under no-sharding default");
}
}
#[test]
fn owns_matches_shard_for_when_shard_count_is_greater_than_one() {
let n = 16;
let cfg_for = |idx| ShardConfig::new(idx, n).unwrap();
let eids: &[i64] = &[
1,
42,
320_816_801_799_737_344,
i64::MAX,
-1,
];
for eid in eids {
let expected = shard_for(*eid, n);
for idx in 0..n {
let cfg = cfg_for(idx);
assert_eq!(
cfg.owns(*eid),
idx == expected,
"ShardConfig {{ index={idx}, count={n} }}.owns({eid}) disagrees with shard_for"
);
}
}
}
#[test]
fn owns_partitions_executions_across_replicas() {
const N: u32 = 4;
const TOTAL: usize = 10_000;
let replicas: Vec<ShardConfig> = (0..N).map(|i| ShardConfig::new(i, N).unwrap()).collect();
let base = 320_816_801_799_737_344_i64;
for i in 0..TOTAL {
let eid = base + i as i64;
let owners: usize = replicas.iter().filter(|r| r.owns(eid)).count();
assert_eq!(
owners, 1,
"execution_id {eid} should be owned by exactly one replica, got {owners}"
);
}
}
#[test]
fn new_rejects_zero_shard_count() {
let err = ShardConfig::new(0, 0).unwrap_err();
assert_eq!(err, ShardConfigError::ZeroShardCount);
}
#[test]
fn new_rejects_index_at_or_above_count() {
let err = ShardConfig::new(4, 4).unwrap_err();
assert_eq!(
err,
ShardConfigError::IndexOutOfRange {
shard_index: 4,
shard_count: 4
}
);
let err = ShardConfig::new(5, 4).unwrap_err();
assert!(matches!(
err,
ShardConfigError::IndexOutOfRange {
shard_index: 5,
shard_count: 4
}
));
}
#[test]
fn new_accepts_valid_config() {
let cfg = ShardConfig::new(3, 4).expect("valid");
assert_eq!(cfg.shard_index, 3);
assert_eq!(cfg.shard_count, 4);
}
#[test]
fn new_accepts_single_shard_at_index_zero() {
let cfg = ShardConfig::new(0, 1).expect("single-shard valid");
assert_eq!(cfg.shard_index, 0);
assert_eq!(cfg.shard_count, 1);
assert!(cfg.owns(42));
}
}