Skip to main content

nodedb_cluster/
routing.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3use std::collections::HashMap;
4
5use nodedb_types::id::{DatabaseId, VShardId};
6
7use crate::error::{ClusterError, Result};
8
9/// Number of virtual shards.
10///
11/// Re-exports [`VShardId::COUNT`] to keep the cluster routing layer and the
12/// types-layer hash function locked to the same constant. Changing the count
13/// in only one of the two crates would silently misroute every collection.
14pub const VSHARD_COUNT: u32 = VShardId::COUNT;
15
16/// Maps vShards to Raft groups and Raft groups to nodes.
17///
18/// The 1024 vShards are divided into distinct Raft Groups
19/// (e.g., vShards 0-63 managed by Raft Group 1 across Nodes A, B, and C).
20///
21/// This table is the authoritative routing source. It is updated atomically
22/// via Raft state machine when:
23/// - A shard migration completes (Phase 3 atomic cut-over)
24/// - A Raft group membership changes
25/// - A node joins or decommissions
26#[derive(
27    Debug,
28    Clone,
29    serde::Serialize,
30    serde::Deserialize,
31    zerompk::ToMessagePack,
32    zerompk::FromMessagePack,
33)]
34pub struct RoutingTable {
35    /// vshard_id → raft_group_id.
36    vshard_to_group: Vec<u64>,
37    /// raft_group_id → (leader_node, [replica_nodes]).
38    group_members: HashMap<u64, GroupInfo>,
39}
40
41#[derive(
42    Debug,
43    Clone,
44    Default,
45    serde::Serialize,
46    serde::Deserialize,
47    zerompk::ToMessagePack,
48    zerompk::FromMessagePack,
49)]
50pub struct GroupInfo {
51    /// Current leader node ID (0 = no leader known).
52    pub leader: u64,
53    /// All voting members (including leader).
54    pub members: Vec<u64>,
55    /// Non-voting learner peers catching up to this group.
56    ///
57    /// Learners receive log replication but do not vote in elections and
58    /// are not counted toward the commit quorum. A learner transitions
59    /// into `members` via a second `PromoteLearner` conf-change once the
60    /// leader observes it has caught up.
61    #[serde(default)]
62    pub learners: Vec<u64>,
63}
64
65impl RoutingTable {
66    /// Create a routing table with uniform distribution of vShards across data groups.
67    ///
68    /// `num_groups` is the number of **data** Raft groups. vShards are distributed
69    /// round-robin across groups `1..=num_groups`. Group 0 is the metadata group and
70    /// is always included in `group_members` but is never assigned any vShards —
71    /// it is accessed only via `propose_to_metadata_group`, never via
72    /// `propose(vshard_id, data)`.
73    ///
74    /// Each data group initially contains `replication_factor` nodes from `nodes`.
75    /// The metadata group (0) receives the same membership as the first data group.
76    pub fn uniform(num_groups: u64, nodes: &[u64], replication_factor: usize) -> Self {
77        assert!(!nodes.is_empty(), "need at least one node");
78        assert!(num_groups > 0, "need at least 1 data group");
79        assert!(replication_factor > 0, "need at least RF=1");
80
81        // vShards map to data groups 1..=num_groups, skipping group 0 (metadata).
82        let mut vshard_to_group = Vec::with_capacity(VSHARD_COUNT as usize);
83        for i in 0..VSHARD_COUNT {
84            vshard_to_group.push(1 + (i as u64) % num_groups);
85        }
86
87        let mut group_members = HashMap::new();
88        // Data groups: 1..=num_groups.
89        for idx in 0..num_groups {
90            let group_id = idx + 1;
91            let rf = replication_factor.min(nodes.len());
92            let start = (idx as usize * rf) % nodes.len();
93            let members: Vec<u64> = (0..rf).map(|i| nodes[(start + i) % nodes.len()]).collect();
94            let leader = members[0];
95            group_members.insert(
96                group_id,
97                GroupInfo {
98                    leader,
99                    members,
100                    learners: Vec::new(),
101                },
102            );
103        }
104        // Metadata group 0: same membership as the first data group.
105        let rf = replication_factor.min(nodes.len());
106        let meta_members: Vec<u64> = (0..rf).map(|i| nodes[i % nodes.len()]).collect();
107        let meta_leader = meta_members[0];
108        group_members.insert(
109            0,
110            GroupInfo {
111                leader: meta_leader,
112                members: meta_members,
113                learners: Vec::new(),
114            },
115        );
116
117        Self {
118            vshard_to_group,
119            group_members,
120        }
121    }
122
123    /// Look up which Raft group owns a vShard.
124    pub fn group_for_vshard(&self, vshard_id: u32) -> Result<u64> {
125        self.vshard_to_group
126            .get(vshard_id as usize)
127            .copied()
128            .ok_or(ClusterError::VShardNotMapped { vshard_id })
129    }
130
131    /// Look up the leader node for a vShard.
132    pub fn leader_for_vshard(&self, vshard_id: u32) -> Result<u64> {
133        let group_id = self.group_for_vshard(vshard_id)?;
134        let info = self
135            .group_members
136            .get(&group_id)
137            .ok_or(ClusterError::GroupNotFound { group_id })?;
138        Ok(info.leader)
139    }
140
141    /// Get group info.
142    pub fn group_info(&self, group_id: u64) -> Option<&GroupInfo> {
143        self.group_members.get(&group_id)
144    }
145
146    /// Update the leader for a Raft group.
147    pub fn set_leader(&mut self, group_id: u64, leader: u64) {
148        if let Some(info) = self.group_members.get_mut(&group_id) {
149            info.leader = leader;
150        }
151    }
152
153    /// Atomically reassign a vShard to a different Raft group.
154    /// Used during Phase 3 (atomic cut-over) of shard migration.
155    pub fn reassign_vshard(&mut self, vshard_id: u32, new_group_id: u64) {
156        if (vshard_id as usize) < self.vshard_to_group.len() {
157            self.vshard_to_group[vshard_id as usize] = new_group_id;
158        }
159    }
160
161    /// All vShards assigned to a given group.
162    pub fn vshards_for_group(&self, group_id: u64) -> Vec<u32> {
163        self.vshard_to_group
164            .iter()
165            .enumerate()
166            .filter(|(_, gid)| **gid == group_id)
167            .map(|(i, _)| i as u32)
168            .collect()
169    }
170
171    /// Number of Raft groups.
172    pub fn num_groups(&self) -> usize {
173        self.group_members.len()
174    }
175
176    /// All group IDs.
177    pub fn group_ids(&self) -> Vec<u64> {
178        self.group_members.keys().copied().collect()
179    }
180
181    /// Update the voting members of a Raft group (for membership changes).
182    pub fn set_group_members(&mut self, group_id: u64, members: Vec<u64>) {
183        if let Some(info) = self.group_members.get_mut(&group_id) {
184            info.members = members;
185        }
186    }
187
188    /// Remove a node from a group's voter and learner lists. If the
189    /// removed node was the current leader hint, the hint is cleared
190    /// so the next query drives a fresh discovery. Returns `true` if
191    /// the group existed and anything was actually removed.
192    ///
193    /// The caller is responsible for safety: dropping below the
194    /// configured replication factor must be gated by
195    /// `decommission::safety::check_can_decommission`.
196    pub fn remove_group_member(&mut self, group_id: u64, node_id: u64) -> bool {
197        let Some(info) = self.group_members.get_mut(&group_id) else {
198            return false;
199        };
200        let before_members = info.members.len();
201        let before_learners = info.learners.len();
202        info.members.retain(|&id| id != node_id);
203        info.learners.retain(|&id| id != node_id);
204        if info.leader == node_id {
205            info.leader = 0;
206        }
207        info.members.len() != before_members || info.learners.len() != before_learners
208    }
209
210    /// Update the learner list for a Raft group.
211    pub fn set_group_learners(&mut self, group_id: u64, learners: Vec<u64>) {
212        if let Some(info) = self.group_members.get_mut(&group_id) {
213            info.learners = learners;
214        }
215    }
216
217    /// Add a learner to a group if not already present. No-op if the peer
218    /// is already a voter or a learner.
219    pub fn add_group_learner(&mut self, group_id: u64, peer: u64) {
220        if let Some(info) = self.group_members.get_mut(&group_id)
221            && !info.members.contains(&peer)
222            && !info.learners.contains(&peer)
223        {
224            info.learners.push(peer);
225        }
226    }
227
228    /// Promote a learner to a voter within a group. Returns `true` if the
229    /// learner was found and promoted.
230    pub fn promote_group_learner(&mut self, group_id: u64, peer: u64) -> bool {
231        if let Some(info) = self.group_members.get_mut(&group_id)
232            && let Some(pos) = info.learners.iter().position(|&id| id == peer)
233        {
234            info.learners.remove(pos);
235            if !info.members.contains(&peer) {
236                info.members.push(peer);
237            }
238            return true;
239        }
240        false
241    }
242
243    /// Access the vshard-to-group mapping (for persistence / wire transfer).
244    pub fn vshard_to_group(&self) -> &[u64] {
245        &self.vshard_to_group
246    }
247
248    /// Access all group members (for persistence / wire transfer).
249    pub fn group_members(&self) -> &HashMap<u64, GroupInfo> {
250        &self.group_members
251    }
252
253    /// Reconstruct a RoutingTable from persisted data.
254    pub fn from_parts(vshard_to_group: Vec<u64>, group_members: HashMap<u64, GroupInfo>) -> Self {
255        Self {
256            vshard_to_group,
257            group_members,
258        }
259    }
260}
261
262/// Compute the primary vShard for a `(database, collection)` pair.
263///
264/// Delegates to [`VShardId::from_collection_in_database`] so the cluster
265/// routing layer and the types-layer hash function cannot drift. The database
266/// id is folded into the hash so the same collection name in two different
267/// databases routes to independent vShards — required for multi-database
268/// isolation. Passing only the collection name (without `db`) would route
269/// every database through the same vShard space and silently corrupt
270/// cross-database deployments; the parameter is mandatory by design.
271pub fn vshard_for_collection(database_id: DatabaseId, collection: &str) -> u32 {
272    VShardId::from_collection_in_database(database_id, collection).as_u32()
273}
274
275/// FNV-1a 64-bit hash for deterministic key partitioning.
276///
277/// Used by distributed join shuffle and shard split to assign keys
278/// to partitions. NOT for vShard routing — use `vshard_for_collection`
279/// for that.
280pub fn fnv1a_hash(key: &str) -> u64 {
281    let mut hash: u64 = 0xcbf29ce484222325;
282    for byte in key.as_bytes() {
283        hash ^= *byte as u64;
284        hash = hash.wrapping_mul(0x100000001b3);
285    }
286    hash
287}
288
289/// Hash `key` using the algorithm recorded in the cluster's [`PlacementHashId`].
290///
291/// Callers load the `PlacementHashId` from `ClusterSettings` once at
292/// startup and pass it through every shard-split / shuffle operation.
293/// The underlying implementations live in
294/// [`crate::catalog::placement_hash`]; this function is the routing-layer
295/// entry point so callers do not need to import the catalog module directly.
296pub fn partition_hash(placement_hash_id: crate::catalog::PlacementHashId, key: &str) -> u64 {
297    crate::catalog::placement_hash(placement_hash_id, key.as_bytes())
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    #[test]
305    fn uniform_distribution() {
306        // 16 data groups → groups 1..=16 for vShards, plus metadata group 0.
307        // Total group_members entries = 17, but vShard groups = 16.
308        let rt = RoutingTable::uniform(16, &[1, 2, 3], 3);
309        // num_groups() returns group_members.len() = 17 (16 data + 1 metadata).
310        assert_eq!(rt.num_groups(), 17);
311
312        // Each data group (1..=16) should have ~64 vShards (1024/16).
313        for gid in 1..=16u64 {
314            let shards = rt.vshards_for_group(gid);
315            assert_eq!(shards.len(), 64);
316        }
317
318        // Metadata group 0 has no vShards.
319        assert_eq!(rt.vshards_for_group(0).len(), 0);
320    }
321
322    #[test]
323    fn leader_lookup() {
324        let rt = RoutingTable::uniform(4, &[10, 20, 30], 3);
325        let leader = rt.leader_for_vshard(0).unwrap();
326        // vshard 0 maps to data group 1, which has a valid leader.
327        assert!(leader > 0);
328    }
329
330    #[test]
331    fn reassign_vshard() {
332        let mut rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
333        let old_group = rt.group_for_vshard(0).unwrap();
334        // old_group is 1 (first data group); reassign to data group 2.
335        let new_group = if old_group < 4 { old_group + 1 } else { 1 };
336        rt.reassign_vshard(0, new_group);
337        assert_eq!(rt.group_for_vshard(0).unwrap(), new_group);
338    }
339
340    #[test]
341    fn set_leader() {
342        let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
343        // Data group 1 owns vshard 0.
344        rt.set_leader(1, 99);
345        assert_eq!(rt.leader_for_vshard(0).unwrap(), 99);
346    }
347
348    #[test]
349    fn remove_group_member_strips_voter_and_clears_leader() {
350        let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
351        // Use data group 1 (vshard 0 owner).
352        rt.set_leader(1, 2);
353        assert!(rt.remove_group_member(1, 2));
354        let info = rt.group_info(1).unwrap();
355        assert!(!info.members.contains(&2));
356        assert_eq!(info.leader, 0, "leader hint should be cleared");
357    }
358
359    #[test]
360    fn remove_group_member_strips_learner_only() {
361        let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
362        rt.add_group_learner(1, 9);
363        assert!(rt.remove_group_member(1, 9));
364        let info = rt.group_info(1).unwrap();
365        assert!(!info.learners.contains(&9));
366    }
367
368    #[test]
369    fn remove_group_member_unknown_group_returns_false() {
370        let mut rt = RoutingTable::uniform(1, &[1, 2], 2);
371        assert!(!rt.remove_group_member(99, 1));
372    }
373
374    #[test]
375    fn vshard_not_mapped() {
376        let rt = RoutingTable::uniform(2, &[1, 2], 2);
377        // All 1024 are mapped, so this shouldn't fail.
378        assert!(rt.group_for_vshard(1023).is_ok());
379    }
380
381    #[test]
382    fn partition_hash_fnv1a_vs_xxhash3_differ() {
383        use crate::catalog::PlacementHashId;
384        let key = "some-partition-key";
385        let fnv = partition_hash(PlacementHashId::Fnv1a, key);
386        let xx3 = partition_hash(PlacementHashId::XxHash3, key);
387        assert_ne!(fnv, xx3, "FNV-1a and XxHash3 must produce distinct values");
388    }
389
390    #[test]
391    fn vshard_for_collection_matches_types_layer() {
392        // The cluster routing layer MUST agree with the types-layer hash
393        // for every (database, collection) pair. Drift here silently
394        // misroutes data across nodes — once this regresses, collections
395        // in any non-DEFAULT database resolve to the wrong vShard on the
396        // gateway while the data plane still keys them by the correct
397        // hash. This test pins the contract.
398        for db_raw in [0u64, 1, 2, 1024, 999_999] {
399            let db = DatabaseId::new(db_raw);
400            for name in ["users", "orders", "events", "a", "this_is_a_long_name"] {
401                assert_eq!(
402                    vshard_for_collection(db, name),
403                    VShardId::from_collection_in_database(db, name).as_u32(),
404                    "drift detected: db={db_raw} collection={name}"
405                );
406            }
407        }
408    }
409
410    #[test]
411    fn vshard_for_collection_diverges_across_databases() {
412        // Same collection name in different databases must route to
413        // different vShards (probabilistic; "users" is a canonical
414        // example whose hashes are known to differ across DEFAULT and
415        // DatabaseId(1024)).
416        let v_default = vshard_for_collection(DatabaseId::DEFAULT, "users");
417        let v_other = vshard_for_collection(DatabaseId::new(1024), "users");
418        assert_ne!(
419            v_default, v_other,
420            "same collection name across databases must route independently"
421        );
422    }
423
424    #[test]
425    fn partition_hash_deterministic() {
426        use crate::catalog::PlacementHashId;
427        let key = "some-partition-key";
428        assert_eq!(
429            partition_hash(PlacementHashId::Fnv1a, key),
430            partition_hash(PlacementHashId::Fnv1a, key)
431        );
432        assert_eq!(
433            partition_hash(PlacementHashId::XxHash3, key),
434            partition_hash(PlacementHashId::XxHash3, key)
435        );
436    }
437}