1use std::collections::HashMap;
4
5use nodedb_types::id::{DatabaseId, VShardId};
6
7use crate::error::{ClusterError, Result};
8
9pub const VSHARD_COUNT: u32 = VShardId::COUNT;
15
16#[derive(
27 Debug,
28 Clone,
29 serde::Serialize,
30 serde::Deserialize,
31 zerompk::ToMessagePack,
32 zerompk::FromMessagePack,
33)]
34pub struct RoutingTable {
35 vshard_to_group: Vec<u64>,
37 group_members: HashMap<u64, GroupInfo>,
39}
40
41#[derive(
42 Debug,
43 Clone,
44 Default,
45 serde::Serialize,
46 serde::Deserialize,
47 zerompk::ToMessagePack,
48 zerompk::FromMessagePack,
49)]
50pub struct GroupInfo {
51 pub leader: u64,
53 pub members: Vec<u64>,
55 #[serde(default)]
62 pub learners: Vec<u64>,
63}
64
65impl RoutingTable {
66 pub fn uniform(num_groups: u64, nodes: &[u64], replication_factor: usize) -> Self {
77 assert!(!nodes.is_empty(), "need at least one node");
78 assert!(num_groups > 0, "need at least 1 data group");
79 assert!(replication_factor > 0, "need at least RF=1");
80
81 let mut vshard_to_group = Vec::with_capacity(VSHARD_COUNT as usize);
83 for i in 0..VSHARD_COUNT {
84 vshard_to_group.push(1 + (i as u64) % num_groups);
85 }
86
87 let mut group_members = HashMap::new();
88 for idx in 0..num_groups {
90 let group_id = idx + 1;
91 let rf = replication_factor.min(nodes.len());
92 let start = (idx as usize * rf) % nodes.len();
93 let members: Vec<u64> = (0..rf).map(|i| nodes[(start + i) % nodes.len()]).collect();
94 let leader = members[0];
95 group_members.insert(
96 group_id,
97 GroupInfo {
98 leader,
99 members,
100 learners: Vec::new(),
101 },
102 );
103 }
104 let rf = replication_factor.min(nodes.len());
106 let meta_members: Vec<u64> = (0..rf).map(|i| nodes[i % nodes.len()]).collect();
107 let meta_leader = meta_members[0];
108 group_members.insert(
109 0,
110 GroupInfo {
111 leader: meta_leader,
112 members: meta_members,
113 learners: Vec::new(),
114 },
115 );
116
117 Self {
118 vshard_to_group,
119 group_members,
120 }
121 }
122
123 pub fn group_for_vshard(&self, vshard_id: u32) -> Result<u64> {
125 self.vshard_to_group
126 .get(vshard_id as usize)
127 .copied()
128 .ok_or(ClusterError::VShardNotMapped { vshard_id })
129 }
130
131 pub fn leader_for_vshard(&self, vshard_id: u32) -> Result<u64> {
133 let group_id = self.group_for_vshard(vshard_id)?;
134 let info = self
135 .group_members
136 .get(&group_id)
137 .ok_or(ClusterError::GroupNotFound { group_id })?;
138 Ok(info.leader)
139 }
140
141 pub fn group_info(&self, group_id: u64) -> Option<&GroupInfo> {
143 self.group_members.get(&group_id)
144 }
145
146 pub fn set_leader(&mut self, group_id: u64, leader: u64) {
148 if let Some(info) = self.group_members.get_mut(&group_id) {
149 info.leader = leader;
150 }
151 }
152
153 pub fn reassign_vshard(&mut self, vshard_id: u32, new_group_id: u64) {
156 if (vshard_id as usize) < self.vshard_to_group.len() {
157 self.vshard_to_group[vshard_id as usize] = new_group_id;
158 }
159 }
160
161 pub fn vshards_for_group(&self, group_id: u64) -> Vec<u32> {
163 self.vshard_to_group
164 .iter()
165 .enumerate()
166 .filter(|(_, gid)| **gid == group_id)
167 .map(|(i, _)| i as u32)
168 .collect()
169 }
170
171 pub fn num_groups(&self) -> usize {
173 self.group_members.len()
174 }
175
176 pub fn group_ids(&self) -> Vec<u64> {
178 self.group_members.keys().copied().collect()
179 }
180
181 pub fn set_group_members(&mut self, group_id: u64, members: Vec<u64>) {
183 if let Some(info) = self.group_members.get_mut(&group_id) {
184 info.members = members;
185 }
186 }
187
188 pub fn remove_group_member(&mut self, group_id: u64, node_id: u64) -> bool {
197 let Some(info) = self.group_members.get_mut(&group_id) else {
198 return false;
199 };
200 let before_members = info.members.len();
201 let before_learners = info.learners.len();
202 info.members.retain(|&id| id != node_id);
203 info.learners.retain(|&id| id != node_id);
204 if info.leader == node_id {
205 info.leader = 0;
206 }
207 info.members.len() != before_members || info.learners.len() != before_learners
208 }
209
210 pub fn set_group_learners(&mut self, group_id: u64, learners: Vec<u64>) {
212 if let Some(info) = self.group_members.get_mut(&group_id) {
213 info.learners = learners;
214 }
215 }
216
217 pub fn add_group_learner(&mut self, group_id: u64, peer: u64) {
220 if let Some(info) = self.group_members.get_mut(&group_id)
221 && !info.members.contains(&peer)
222 && !info.learners.contains(&peer)
223 {
224 info.learners.push(peer);
225 }
226 }
227
228 pub fn promote_group_learner(&mut self, group_id: u64, peer: u64) -> bool {
231 if let Some(info) = self.group_members.get_mut(&group_id)
232 && let Some(pos) = info.learners.iter().position(|&id| id == peer)
233 {
234 info.learners.remove(pos);
235 if !info.members.contains(&peer) {
236 info.members.push(peer);
237 }
238 return true;
239 }
240 false
241 }
242
243 pub fn vshard_to_group(&self) -> &[u64] {
245 &self.vshard_to_group
246 }
247
248 pub fn group_members(&self) -> &HashMap<u64, GroupInfo> {
250 &self.group_members
251 }
252
253 pub fn from_parts(vshard_to_group: Vec<u64>, group_members: HashMap<u64, GroupInfo>) -> Self {
255 Self {
256 vshard_to_group,
257 group_members,
258 }
259 }
260}
261
262pub fn vshard_for_collection(database_id: DatabaseId, collection: &str) -> u32 {
272 VShardId::from_collection_in_database(database_id, collection).as_u32()
273}
274
275pub fn fnv1a_hash(key: &str) -> u64 {
281 let mut hash: u64 = 0xcbf29ce484222325;
282 for byte in key.as_bytes() {
283 hash ^= *byte as u64;
284 hash = hash.wrapping_mul(0x100000001b3);
285 }
286 hash
287}
288
289pub fn partition_hash(placement_hash_id: crate::catalog::PlacementHashId, key: &str) -> u64 {
297 crate::catalog::placement_hash(placement_hash_id, key.as_bytes())
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 #[test]
305 fn uniform_distribution() {
306 let rt = RoutingTable::uniform(16, &[1, 2, 3], 3);
309 assert_eq!(rt.num_groups(), 17);
311
312 for gid in 1..=16u64 {
314 let shards = rt.vshards_for_group(gid);
315 assert_eq!(shards.len(), 64);
316 }
317
318 assert_eq!(rt.vshards_for_group(0).len(), 0);
320 }
321
322 #[test]
323 fn leader_lookup() {
324 let rt = RoutingTable::uniform(4, &[10, 20, 30], 3);
325 let leader = rt.leader_for_vshard(0).unwrap();
326 assert!(leader > 0);
328 }
329
330 #[test]
331 fn reassign_vshard() {
332 let mut rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
333 let old_group = rt.group_for_vshard(0).unwrap();
334 let new_group = if old_group < 4 { old_group + 1 } else { 1 };
336 rt.reassign_vshard(0, new_group);
337 assert_eq!(rt.group_for_vshard(0).unwrap(), new_group);
338 }
339
340 #[test]
341 fn set_leader() {
342 let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
343 rt.set_leader(1, 99);
345 assert_eq!(rt.leader_for_vshard(0).unwrap(), 99);
346 }
347
348 #[test]
349 fn remove_group_member_strips_voter_and_clears_leader() {
350 let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
351 rt.set_leader(1, 2);
353 assert!(rt.remove_group_member(1, 2));
354 let info = rt.group_info(1).unwrap();
355 assert!(!info.members.contains(&2));
356 assert_eq!(info.leader, 0, "leader hint should be cleared");
357 }
358
359 #[test]
360 fn remove_group_member_strips_learner_only() {
361 let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
362 rt.add_group_learner(1, 9);
363 assert!(rt.remove_group_member(1, 9));
364 let info = rt.group_info(1).unwrap();
365 assert!(!info.learners.contains(&9));
366 }
367
368 #[test]
369 fn remove_group_member_unknown_group_returns_false() {
370 let mut rt = RoutingTable::uniform(1, &[1, 2], 2);
371 assert!(!rt.remove_group_member(99, 1));
372 }
373
374 #[test]
375 fn vshard_not_mapped() {
376 let rt = RoutingTable::uniform(2, &[1, 2], 2);
377 assert!(rt.group_for_vshard(1023).is_ok());
379 }
380
381 #[test]
382 fn partition_hash_fnv1a_vs_xxhash3_differ() {
383 use crate::catalog::PlacementHashId;
384 let key = "some-partition-key";
385 let fnv = partition_hash(PlacementHashId::Fnv1a, key);
386 let xx3 = partition_hash(PlacementHashId::XxHash3, key);
387 assert_ne!(fnv, xx3, "FNV-1a and XxHash3 must produce distinct values");
388 }
389
390 #[test]
391 fn vshard_for_collection_matches_types_layer() {
392 for db_raw in [0u64, 1, 2, 1024, 999_999] {
399 let db = DatabaseId::new(db_raw);
400 for name in ["users", "orders", "events", "a", "this_is_a_long_name"] {
401 assert_eq!(
402 vshard_for_collection(db, name),
403 VShardId::from_collection_in_database(db, name).as_u32(),
404 "drift detected: db={db_raw} collection={name}"
405 );
406 }
407 }
408 }
409
410 #[test]
411 fn vshard_for_collection_diverges_across_databases() {
412 let v_default = vshard_for_collection(DatabaseId::DEFAULT, "users");
417 let v_other = vshard_for_collection(DatabaseId::new(1024), "users");
418 assert_ne!(
419 v_default, v_other,
420 "same collection name across databases must route independently"
421 );
422 }
423
424 #[test]
425 fn partition_hash_deterministic() {
426 use crate::catalog::PlacementHashId;
427 let key = "some-partition-key";
428 assert_eq!(
429 partition_hash(PlacementHashId::Fnv1a, key),
430 partition_hash(PlacementHashId::Fnv1a, key)
431 );
432 assert_eq!(
433 partition_hash(PlacementHashId::XxHash3, key),
434 partition_hash(PlacementHashId::XxHash3, key)
435 );
436 }
437}