use crate::error::{ClusterError, Result};
use crate::routing::VSHARD_COUNT;
pub fn array_vshard_for_tile(hilbert_prefix: u64, prefix_bits: u8) -> Result<u32> {
validate_prefix_bits(prefix_bits)?;
let bucket = prefix_bucket(hilbert_prefix, prefix_bits);
Ok(bucket_to_vshard(bucket, prefix_bits))
}
pub fn array_vshards_for_slice(
slice_hilbert_ranges: &[(u64, u64)],
prefix_bits: u8,
total_shards: u32,
) -> Result<Vec<u32>> {
if total_shards == 0 {
return Ok(Vec::new());
}
validate_prefix_bits(prefix_bits)?;
let stride = vshard_stride(prefix_bits);
let mut out: Vec<u32> = Vec::new();
if slice_hilbert_ranges.is_empty() {
let mut vshard_id: u32 = 0;
while vshard_id < total_shards && vshard_id < VSHARD_COUNT {
out.push(vshard_id);
vshard_id = vshard_id.saturating_add(stride);
if stride == 0 {
break;
}
}
return Ok(out);
}
for &(lo, hi) in slice_hilbert_ranges {
let lo_bucket = prefix_bucket(lo, prefix_bits);
let hi_bucket = prefix_bucket(hi, prefix_bits);
for bucket in lo_bucket..=hi_bucket {
let vshard_start = bucket_to_vshard(bucket, prefix_bits);
if vshard_start < total_shards {
out.push(vshard_start);
}
}
}
out.sort_unstable();
out.dedup();
Ok(out)
}
fn validate_prefix_bits(prefix_bits: u8) -> Result<()> {
if prefix_bits == 0 || prefix_bits > 16 {
return Err(ClusterError::Codec {
detail: format!("array routing: prefix_bits must be 1-16, got {prefix_bits}"),
});
}
Ok(())
}
fn prefix_bucket(hilbert_prefix: u64, prefix_bits: u8) -> u32 {
let shift = 64u8.saturating_sub(prefix_bits);
(hilbert_prefix >> shift) as u32
}
fn bucket_to_vshard(bucket: u32, prefix_bits: u8) -> u32 {
let stride = vshard_stride(prefix_bits);
bucket.saturating_mul(stride)
}
fn vshard_stride(prefix_bits: u8) -> u32 {
let shifted = VSHARD_COUNT >> (prefix_bits as u32);
shifted.max(1)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::routing::VSHARD_COUNT;
#[test]
fn tile_routing_invalid_prefix_bits() {
assert!(array_vshard_for_tile(0, 0).is_err());
assert!(array_vshard_for_tile(0, 17).is_err());
}
#[test]
fn tile_routing_top_bit_prefix1() {
let shard_low = array_vshard_for_tile(0x0000_0000_0000_0000, 1).unwrap();
let shard_high = array_vshard_for_tile(0x8000_0000_0000_0000, 1).unwrap();
assert_eq!(shard_low, 0);
assert_eq!(shard_high, 512);
}
#[test]
fn tile_routing_prefix10_is_direct() {
let vshard = array_vshard_for_tile(0x0040_0000_0000_0000, 10).unwrap();
assert_eq!(vshard, 1);
}
#[test]
fn tile_routing_prefix8_stride4() {
let shard0 = array_vshard_for_tile(0x0000_0000_0000_0000, 8).unwrap();
let shard1 = array_vshard_for_tile(0x0100_0000_0000_0000, 8).unwrap();
assert_eq!(shard0, 0);
assert_eq!(shard1, 4);
}
#[test]
fn tile_routing_output_in_vshard_range() {
for bits in 1u8..=10 {
for &prefix in &[0u64, u64::MAX / 3, u64::MAX / 2, u64::MAX] {
let vshard = array_vshard_for_tile(prefix, bits).unwrap();
assert!(
vshard < VSHARD_COUNT,
"bits={bits} prefix={prefix:#x} → vshard={vshard} >= {VSHARD_COUNT}"
);
}
}
}
#[test]
fn slice_routing_zero_shards() {
let shards = array_vshards_for_slice(&[(0, u64::MAX)], 8, 0).unwrap();
assert!(shards.is_empty());
}
#[test]
fn slice_routing_empty_ranges_returns_all() {
let shards = array_vshards_for_slice(&[], 8, 4).unwrap();
assert_eq!(shards, vec![0]);
}
#[test]
fn slice_routing_full_range_returns_all_active() {
let shards = array_vshards_for_slice(&[(0, u64::MAX)], 8, 4).unwrap();
assert_eq!(shards, vec![0]);
}
#[test]
fn slice_routing_single_bucket_prefix8() {
let shards = array_vshards_for_slice(&[(0, 0)], 8, 16).unwrap();
assert_eq!(shards, vec![0]);
}
#[test]
fn slice_routing_two_adjacent_buckets() {
let shards = array_vshards_for_slice(
&[
(0x0000_0000_0000_0000, 0x00FF_FFFF_FFFF_FFFF),
(0x0100_0000_0000_0000, 0x01FF_FFFF_FFFF_FFFF),
],
8,
1024,
)
.unwrap();
assert_eq!(shards, vec![0, 4]);
}
#[test]
fn slice_routing_clamped_to_total_shards() {
let shards = array_vshards_for_slice(&[(0, u64::MAX)], 8, 2).unwrap();
assert_eq!(shards, vec![0]);
}
#[test]
fn slice_routing_dedup_overlapping_ranges() {
let shards = array_vshards_for_slice(
&[
(0x0000_0000_0000_0000, 0x0000_0000_0000_0001),
(0x0000_0000_0000_0002, 0x0000_0000_0000_0003),
],
8,
16,
)
.unwrap();
assert_eq!(shards, vec![0]);
}
#[test]
fn vshard_stride_values() {
assert_eq!(vshard_stride(1), 512);
assert_eq!(vshard_stride(8), 4);
assert_eq!(vshard_stride(10), 1);
assert_eq!(vshard_stride(16), 1); }
}