nodedb_cluster/distributed_array/routing.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! vShard routing for array tiles.
4//!
5//! Arrays are sharded by Hilbert-curve prefix: the top `prefix_bits`
6//! of a tile's `hilbert_prefix` determine which vShard bucket owns it.
7//!
8//! With `prefix_bits = P` there are `2^P` distinct buckets numbered
9//! `0 .. 2^P - 1`. Each bucket maps to a contiguous range of vShards
10//! within the cluster's `VSHARD_COUNT`-wide space. The stride is
11//! `VSHARD_COUNT / 2^P` (integer division). Bucket `b` owns vShards
12//! `[b * stride .. (b + 1) * stride)`. The primary vShard for a bucket
13//! is the first one in that range (`b * stride`).
14
15use crate::error::{ClusterError, Result};
16use crate::routing::VSHARD_COUNT;
17
18// Array Hilbert-range ownership is derived entirely from vShard ownership:
19// bucket→vshard is a deterministic function of `prefix_bits`, and
20// vshard→node is maintained by `RoutingTable` (updated by committed
21// `RoutingChange::ReassignVShard` entries). No separate array-specific
22// routing variant is needed in the metadata log — array migration is "free"
23// because a vShard rebalance already atomically re-points all array tile
24// ownership through the same `RoutingTable` cut-over path.
25
26/// Determine the primary owning vShard for a single tile identified by
27/// its raw Hilbert-prefix value.
28///
29/// `hilbert_prefix` — the tile's u64 Hilbert key.
30/// `prefix_bits` — number of high-order bits used for routing (1–16).
31///
32/// Returns the vShard ID in `[0, VSHARD_COUNT)`.
33pub fn array_vshard_for_tile(hilbert_prefix: u64, prefix_bits: u8) -> Result<u32> {
34 validate_prefix_bits(prefix_bits)?;
35 let bucket = prefix_bucket(hilbert_prefix, prefix_bits);
36 Ok(bucket_to_vshard(bucket, prefix_bits))
37}
38
39/// Determine all vShards that intersect a set of Hilbert-prefix ranges.
40///
41/// `slice_hilbert_ranges` — list of `(lo, hi)` inclusive Hilbert-prefix
42/// ranges covering the slice's MBR. An empty slice means "unbounded"
43/// (returns all vShards up to `total_shards`).
44///
45/// `prefix_bits` — routing granularity (1–16).
46/// `total_shards` — number of active vShards; returned IDs are
47/// clamped to `[0, total_shards)`.
48///
49/// Returns a sorted, deduplicated list of vShard IDs.
50pub fn array_vshards_for_slice(
51 slice_hilbert_ranges: &[(u64, u64)],
52 prefix_bits: u8,
53 total_shards: u32,
54) -> Result<Vec<u32>> {
55 if total_shards == 0 {
56 return Ok(Vec::new());
57 }
58 validate_prefix_bits(prefix_bits)?;
59
60 let stride = vshard_stride(prefix_bits);
61 let mut out: Vec<u32> = Vec::new();
62
63 // Unbounded slice: return one canonical primary vShard per bucket.
64 // Within a bucket all vShards serve the same Hilbert range, so fanning
65 // out to every member would multiply the result by `stride`.
66 if slice_hilbert_ranges.is_empty() {
67 let mut vshard_id: u32 = 0;
68 while vshard_id < total_shards && vshard_id < VSHARD_COUNT {
69 out.push(vshard_id);
70 vshard_id = vshard_id.saturating_add(stride);
71 if stride == 0 {
72 break;
73 }
74 }
75 return Ok(out);
76 }
77
78 for &(lo, hi) in slice_hilbert_ranges {
79 let lo_bucket = prefix_bucket(lo, prefix_bits);
80 // `hi` is inclusive, so we take the bucket that contains `hi`.
81 let hi_bucket = prefix_bucket(hi, prefix_bits);
82
83 for bucket in lo_bucket..=hi_bucket {
84 // One canonical primary vShard per bucket. All other vShards in
85 // `[vshard_start, vshard_start + stride)` serve identical Hilbert
86 // ranges; fanning out to all of them double-counts on aggregate.
87 let vshard_start = bucket_to_vshard(bucket, prefix_bits);
88 if vshard_start < total_shards {
89 out.push(vshard_start);
90 }
91 }
92 }
93
94 out.sort_unstable();
95 out.dedup();
96 Ok(out)
97}
98
99// ── Internal helpers ──────────────────────────────────────────────────────
100
101fn validate_prefix_bits(prefix_bits: u8) -> Result<()> {
102 if prefix_bits == 0 || prefix_bits > 16 {
103 return Err(ClusterError::Codec {
104 detail: format!("array routing: prefix_bits must be 1-16, got {prefix_bits}"),
105 });
106 }
107 Ok(())
108}
109
110/// Extract the top `prefix_bits` bits of a Hilbert prefix as a bucket index.
111fn prefix_bucket(hilbert_prefix: u64, prefix_bits: u8) -> u32 {
112 // Shift right so the top `prefix_bits` bits are in the low-order position.
113 let shift = 64u8.saturating_sub(prefix_bits);
114 (hilbert_prefix >> shift) as u32
115}
116
117/// Map a bucket index to the primary vShard in that bucket's range.
118fn bucket_to_vshard(bucket: u32, prefix_bits: u8) -> u32 {
119 let stride = vshard_stride(prefix_bits);
120 bucket.saturating_mul(stride)
121}
122
123/// Number of vShards per Hilbert bucket.
124///
125/// With `P` prefix bits there are `2^P` buckets and `VSHARD_COUNT`
126/// total vShards. Stride = `VSHARD_COUNT >> P`. When `P >= log2(VSHARD_COUNT)`
127/// the stride is 1 (one vShard per bucket or less).
128fn vshard_stride(prefix_bits: u8) -> u32 {
129 // VSHARD_COUNT is a power of two (1024 = 2^10).
130 // Right-shifting by prefix_bits gives the stride, floored at 1.
131 let shifted = VSHARD_COUNT >> (prefix_bits as u32);
132 shifted.max(1)
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138 use crate::routing::VSHARD_COUNT;
139
140 #[test]
141 fn tile_routing_invalid_prefix_bits() {
142 assert!(array_vshard_for_tile(0, 0).is_err());
143 assert!(array_vshard_for_tile(0, 17).is_err());
144 }
145
146 #[test]
147 fn tile_routing_top_bit_prefix1() {
148 // With prefix_bits=1: stride=512. Bucket 0 → vshard 0, bucket 1 → vshard 512.
149 let shard_low = array_vshard_for_tile(0x0000_0000_0000_0000, 1).unwrap();
150 let shard_high = array_vshard_for_tile(0x8000_0000_0000_0000, 1).unwrap();
151 assert_eq!(shard_low, 0);
152 assert_eq!(shard_high, 512);
153 }
154
155 #[test]
156 fn tile_routing_prefix10_is_direct() {
157 // With prefix_bits=10: stride=1, bucket == vshard_id.
158 // Top 10 bits of 0x0100_0000_0000_0000 = 0b0000_0000_01 = 1.
159 let vshard = array_vshard_for_tile(0x0040_0000_0000_0000, 10).unwrap();
160 // 0x0040... >> (64-10) = 0x0040... >> 54 = 1
161 assert_eq!(vshard, 1);
162 }
163
164 #[test]
165 fn tile_routing_prefix8_stride4() {
166 // With prefix_bits=8: stride=4. Bucket 0 → vshard 0, bucket 1 → vshard 4.
167 let shard0 = array_vshard_for_tile(0x0000_0000_0000_0000, 8).unwrap();
168 // Top 8 bits of 0x0100... = 1.
169 let shard1 = array_vshard_for_tile(0x0100_0000_0000_0000, 8).unwrap();
170 assert_eq!(shard0, 0);
171 assert_eq!(shard1, 4);
172 }
173
174 #[test]
175 fn tile_routing_output_in_vshard_range() {
176 // Every vshard returned must be < VSHARD_COUNT.
177 for bits in 1u8..=10 {
178 for &prefix in &[0u64, u64::MAX / 3, u64::MAX / 2, u64::MAX] {
179 let vshard = array_vshard_for_tile(prefix, bits).unwrap();
180 assert!(
181 vshard < VSHARD_COUNT,
182 "bits={bits} prefix={prefix:#x} → vshard={vshard} >= {VSHARD_COUNT}"
183 );
184 }
185 }
186 }
187
188 #[test]
189 fn slice_routing_zero_shards() {
190 let shards = array_vshards_for_slice(&[(0, u64::MAX)], 8, 0).unwrap();
191 assert!(shards.is_empty());
192 }
193
194 #[test]
195 fn slice_routing_empty_ranges_returns_all() {
196 // Empty range list = unbounded slice → one canonical primary per bucket.
197 // With prefix_bits=8 (stride=4) and total_shards=4, only primary 0 fits.
198 let shards = array_vshards_for_slice(&[], 8, 4).unwrap();
199 assert_eq!(shards, vec![0]);
200 }
201
202 #[test]
203 fn slice_routing_full_range_returns_all_active() {
204 // Full range covers all 256 buckets → one primary per bucket.
205 // total_shards=4 only fits primary 0.
206 let shards = array_vshards_for_slice(&[(0, u64::MAX)], 8, 4).unwrap();
207 assert_eq!(shards, vec![0]);
208 }
209
210 #[test]
211 fn slice_routing_single_bucket_prefix8() {
212 // Hilbert range [0, 0] covers only bucket 0 → primary vShard 0.
213 let shards = array_vshards_for_slice(&[(0, 0)], 8, 16).unwrap();
214 assert_eq!(shards, vec![0]);
215 }
216
217 #[test]
218 fn slice_routing_two_adjacent_buckets() {
219 // prefix_bits=8, stride=4. One canonical primary per bucket.
220 // Range 1 → bucket 0 → primary vShard 0.
221 // Range 2 → bucket 1 → primary vShard 4.
222 let shards = array_vshards_for_slice(
223 &[
224 (0x0000_0000_0000_0000, 0x00FF_FFFF_FFFF_FFFF),
225 (0x0100_0000_0000_0000, 0x01FF_FFFF_FFFF_FFFF),
226 ],
227 8,
228 1024,
229 )
230 .unwrap();
231 assert_eq!(shards, vec![0, 4]);
232 }
233
234 #[test]
235 fn slice_routing_clamped_to_total_shards() {
236 // With total_shards=2, only the primary for bucket 0 (= vShard 0) fits.
237 let shards = array_vshards_for_slice(&[(0, u64::MAX)], 8, 2).unwrap();
238 assert_eq!(shards, vec![0]);
239 }
240
241 #[test]
242 fn slice_routing_dedup_overlapping_ranges() {
243 // Two ranges that map to the same bucket → one canonical primary vShard.
244 let shards = array_vshards_for_slice(
245 &[
246 (0x0000_0000_0000_0000, 0x0000_0000_0000_0001),
247 (0x0000_0000_0000_0002, 0x0000_0000_0000_0003),
248 ],
249 8,
250 16,
251 )
252 .unwrap();
253 // Both ranges are in bucket 0 → primary vShard 0.
254 assert_eq!(shards, vec![0]);
255 }
256
257 #[test]
258 fn vshard_stride_values() {
259 // VSHARD_COUNT=1024.
260 assert_eq!(vshard_stride(1), 512);
261 assert_eq!(vshard_stride(8), 4);
262 assert_eq!(vshard_stride(10), 1);
263 assert_eq!(vshard_stride(16), 1); // floored at 1
264 }
265}