Skip to main content

nodedb_cluster/distributed_array/
routing.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! vShard routing for array tiles.
4//!
5//! Arrays are sharded by Hilbert-curve prefix: the top `prefix_bits`
6//! of a tile's `hilbert_prefix` determine which vShard bucket owns it.
7//!
8//! With `prefix_bits = P` there are `2^P` distinct buckets numbered
9//! `0 .. 2^P - 1`. Each bucket maps to a contiguous range of vShards
10//! within the cluster's `VSHARD_COUNT`-wide space. The stride is
11//! `VSHARD_COUNT / 2^P` (integer division). Bucket `b` owns vShards
12//! `[b * stride .. (b + 1) * stride)`. The primary vShard for a bucket
13//! is the first one in that range (`b * stride`).
14
15use crate::error::{ClusterError, Result};
16use crate::routing::VSHARD_COUNT;
17
18// Array Hilbert-range ownership is derived entirely from vShard ownership:
19// bucket→vshard is a deterministic function of `prefix_bits`, and
20// vshard→node is maintained by `RoutingTable` (updated by committed
21// `RoutingChange::ReassignVShard` entries). No separate array-specific
22// routing variant is needed in the metadata log — array migration is "free"
23// because a vShard rebalance already atomically re-points all array tile
24// ownership through the same `RoutingTable` cut-over path.
25
26/// Determine the primary owning vShard for a single tile identified by
27/// its raw Hilbert-prefix value.
28///
29/// `hilbert_prefix` — the tile's u64 Hilbert key.
30/// `prefix_bits` — number of high-order bits used for routing (1–16).
31///
32/// Returns the vShard ID in `[0, VSHARD_COUNT)`.
33pub fn array_vshard_for_tile(hilbert_prefix: u64, prefix_bits: u8) -> Result<u32> {
34    validate_prefix_bits(prefix_bits)?;
35    let bucket = prefix_bucket(hilbert_prefix, prefix_bits);
36    Ok(bucket_to_vshard(bucket, prefix_bits))
37}
38
39/// Determine all vShards that intersect a set of Hilbert-prefix ranges.
40///
41/// `slice_hilbert_ranges` — list of `(lo, hi)` inclusive Hilbert-prefix
42/// ranges covering the slice's MBR. An empty slice means "unbounded"
43/// (returns all vShards up to `total_shards`).
44///
45/// `prefix_bits` — routing granularity (1–16).
46/// `total_shards` — number of active vShards; returned IDs are
47/// clamped to `[0, total_shards)`.
48///
49/// Returns a sorted, deduplicated list of vShard IDs.
50pub fn array_vshards_for_slice(
51    slice_hilbert_ranges: &[(u64, u64)],
52    prefix_bits: u8,
53    total_shards: u32,
54) -> Result<Vec<u32>> {
55    if total_shards == 0 {
56        return Ok(Vec::new());
57    }
58    validate_prefix_bits(prefix_bits)?;
59
60    let stride = vshard_stride(prefix_bits);
61    let mut out: Vec<u32> = Vec::new();
62
63    // Unbounded slice: return one canonical primary vShard per bucket.
64    // Within a bucket all vShards serve the same Hilbert range, so fanning
65    // out to every member would multiply the result by `stride`.
66    if slice_hilbert_ranges.is_empty() {
67        let mut vshard_id: u32 = 0;
68        while vshard_id < total_shards && vshard_id < VSHARD_COUNT {
69            out.push(vshard_id);
70            vshard_id = vshard_id.saturating_add(stride);
71            if stride == 0 {
72                break;
73            }
74        }
75        return Ok(out);
76    }
77
78    for &(lo, hi) in slice_hilbert_ranges {
79        let lo_bucket = prefix_bucket(lo, prefix_bits);
80        // `hi` is inclusive, so we take the bucket that contains `hi`.
81        let hi_bucket = prefix_bucket(hi, prefix_bits);
82
83        for bucket in lo_bucket..=hi_bucket {
84            // One canonical primary vShard per bucket. All other vShards in
85            // `[vshard_start, vshard_start + stride)` serve identical Hilbert
86            // ranges; fanning out to all of them double-counts on aggregate.
87            let vshard_start = bucket_to_vshard(bucket, prefix_bits);
88            if vshard_start < total_shards {
89                out.push(vshard_start);
90            }
91        }
92    }
93
94    out.sort_unstable();
95    out.dedup();
96    Ok(out)
97}
98
99// ── Internal helpers ──────────────────────────────────────────────────────
100
101fn validate_prefix_bits(prefix_bits: u8) -> Result<()> {
102    if prefix_bits == 0 || prefix_bits > 16 {
103        return Err(ClusterError::Codec {
104            detail: format!("array routing: prefix_bits must be 1-16, got {prefix_bits}"),
105        });
106    }
107    Ok(())
108}
109
110/// Extract the top `prefix_bits` bits of a Hilbert prefix as a bucket index.
111fn prefix_bucket(hilbert_prefix: u64, prefix_bits: u8) -> u32 {
112    // Shift right so the top `prefix_bits` bits are in the low-order position.
113    let shift = 64u8.saturating_sub(prefix_bits);
114    (hilbert_prefix >> shift) as u32
115}
116
117/// Map a bucket index to the primary vShard in that bucket's range.
118fn bucket_to_vshard(bucket: u32, prefix_bits: u8) -> u32 {
119    let stride = vshard_stride(prefix_bits);
120    bucket.saturating_mul(stride)
121}
122
123/// Number of vShards per Hilbert bucket.
124///
125/// With `P` prefix bits there are `2^P` buckets and `VSHARD_COUNT`
126/// total vShards. Stride = `VSHARD_COUNT >> P`. When `P >= log2(VSHARD_COUNT)`
127/// the stride is 1 (one vShard per bucket or less).
128fn vshard_stride(prefix_bits: u8) -> u32 {
129    // VSHARD_COUNT is a power of two (1024 = 2^10).
130    // Right-shifting by prefix_bits gives the stride, floored at 1.
131    let shifted = VSHARD_COUNT >> (prefix_bits as u32);
132    shifted.max(1)
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138    use crate::routing::VSHARD_COUNT;
139
140    #[test]
141    fn tile_routing_invalid_prefix_bits() {
142        assert!(array_vshard_for_tile(0, 0).is_err());
143        assert!(array_vshard_for_tile(0, 17).is_err());
144    }
145
146    #[test]
147    fn tile_routing_top_bit_prefix1() {
148        // With prefix_bits=1: stride=512. Bucket 0 → vshard 0, bucket 1 → vshard 512.
149        let shard_low = array_vshard_for_tile(0x0000_0000_0000_0000, 1).unwrap();
150        let shard_high = array_vshard_for_tile(0x8000_0000_0000_0000, 1).unwrap();
151        assert_eq!(shard_low, 0);
152        assert_eq!(shard_high, 512);
153    }
154
155    #[test]
156    fn tile_routing_prefix10_is_direct() {
157        // With prefix_bits=10: stride=1, bucket == vshard_id.
158        // Top 10 bits of 0x0100_0000_0000_0000 = 0b0000_0000_01 = 1.
159        let vshard = array_vshard_for_tile(0x0040_0000_0000_0000, 10).unwrap();
160        // 0x0040... >> (64-10) = 0x0040... >> 54 = 1
161        assert_eq!(vshard, 1);
162    }
163
164    #[test]
165    fn tile_routing_prefix8_stride4() {
166        // With prefix_bits=8: stride=4. Bucket 0 → vshard 0, bucket 1 → vshard 4.
167        let shard0 = array_vshard_for_tile(0x0000_0000_0000_0000, 8).unwrap();
168        // Top 8 bits of 0x0100... = 1.
169        let shard1 = array_vshard_for_tile(0x0100_0000_0000_0000, 8).unwrap();
170        assert_eq!(shard0, 0);
171        assert_eq!(shard1, 4);
172    }
173
174    #[test]
175    fn tile_routing_output_in_vshard_range() {
176        // Every vshard returned must be < VSHARD_COUNT.
177        for bits in 1u8..=10 {
178            for &prefix in &[0u64, u64::MAX / 3, u64::MAX / 2, u64::MAX] {
179                let vshard = array_vshard_for_tile(prefix, bits).unwrap();
180                assert!(
181                    vshard < VSHARD_COUNT,
182                    "bits={bits} prefix={prefix:#x} → vshard={vshard} >= {VSHARD_COUNT}"
183                );
184            }
185        }
186    }
187
188    #[test]
189    fn slice_routing_zero_shards() {
190        let shards = array_vshards_for_slice(&[(0, u64::MAX)], 8, 0).unwrap();
191        assert!(shards.is_empty());
192    }
193
194    #[test]
195    fn slice_routing_empty_ranges_returns_all() {
196        // Empty range list = unbounded slice → one canonical primary per bucket.
197        // With prefix_bits=8 (stride=4) and total_shards=4, only primary 0 fits.
198        let shards = array_vshards_for_slice(&[], 8, 4).unwrap();
199        assert_eq!(shards, vec![0]);
200    }
201
202    #[test]
203    fn slice_routing_full_range_returns_all_active() {
204        // Full range covers all 256 buckets → one primary per bucket.
205        // total_shards=4 only fits primary 0.
206        let shards = array_vshards_for_slice(&[(0, u64::MAX)], 8, 4).unwrap();
207        assert_eq!(shards, vec![0]);
208    }
209
210    #[test]
211    fn slice_routing_single_bucket_prefix8() {
212        // Hilbert range [0, 0] covers only bucket 0 → primary vShard 0.
213        let shards = array_vshards_for_slice(&[(0, 0)], 8, 16).unwrap();
214        assert_eq!(shards, vec![0]);
215    }
216
217    #[test]
218    fn slice_routing_two_adjacent_buckets() {
219        // prefix_bits=8, stride=4. One canonical primary per bucket.
220        // Range 1 → bucket 0 → primary vShard 0.
221        // Range 2 → bucket 1 → primary vShard 4.
222        let shards = array_vshards_for_slice(
223            &[
224                (0x0000_0000_0000_0000, 0x00FF_FFFF_FFFF_FFFF),
225                (0x0100_0000_0000_0000, 0x01FF_FFFF_FFFF_FFFF),
226            ],
227            8,
228            1024,
229        )
230        .unwrap();
231        assert_eq!(shards, vec![0, 4]);
232    }
233
234    #[test]
235    fn slice_routing_clamped_to_total_shards() {
236        // With total_shards=2, only the primary for bucket 0 (= vShard 0) fits.
237        let shards = array_vshards_for_slice(&[(0, u64::MAX)], 8, 2).unwrap();
238        assert_eq!(shards, vec![0]);
239    }
240
241    #[test]
242    fn slice_routing_dedup_overlapping_ranges() {
243        // Two ranges that map to the same bucket → one canonical primary vShard.
244        let shards = array_vshards_for_slice(
245            &[
246                (0x0000_0000_0000_0000, 0x0000_0000_0000_0001),
247                (0x0000_0000_0000_0002, 0x0000_0000_0000_0003),
248            ],
249            8,
250            16,
251        )
252        .unwrap();
253        // Both ranges are in bucket 0 → primary vShard 0.
254        assert_eq!(shards, vec![0]);
255    }
256
257    #[test]
258    fn vshard_stride_values() {
259        // VSHARD_COUNT=1024.
260        assert_eq!(vshard_stride(1), 512);
261        assert_eq!(vshard_stride(8), 4);
262        assert_eq!(vshard_stride(10), 1);
263        assert_eq!(vshard_stride(16), 1); // floored at 1
264    }
265}