Skip to main content

nodedb_cluster/
routing.rs

1use std::collections::HashMap;
2
3use crate::error::{ClusterError, Result};
4
5/// Number of virtual shards.
6pub const VSHARD_COUNT: u16 = 1024;
7
8/// Maps vShards to Raft groups and Raft groups to nodes.
9///
10/// The 1024 vShards are divided into distinct Raft Groups
11/// (e.g., vShards 0-63 managed by Raft Group 1 across Nodes A, B, and C).
12///
13/// This table is the authoritative routing source. It is updated atomically
14/// via Raft state machine when:
15/// - A shard migration completes (Phase 3 atomic cut-over)
16/// - A Raft group membership changes
17/// - A node joins or decommissions
18#[derive(
19    Debug,
20    Clone,
21    serde::Serialize,
22    serde::Deserialize,
23    zerompk::ToMessagePack,
24    zerompk::FromMessagePack,
25)]
26pub struct RoutingTable {
27    /// vshard_id → raft_group_id.
28    vshard_to_group: Vec<u64>,
29    /// raft_group_id → (leader_node, [replica_nodes]).
30    group_members: HashMap<u64, GroupInfo>,
31}
32
33#[derive(
34    Debug,
35    Clone,
36    Default,
37    serde::Serialize,
38    serde::Deserialize,
39    zerompk::ToMessagePack,
40    zerompk::FromMessagePack,
41)]
42pub struct GroupInfo {
43    /// Current leader node ID (0 = no leader known).
44    pub leader: u64,
45    /// All voting members (including leader).
46    pub members: Vec<u64>,
47    /// Non-voting learner peers catching up to this group.
48    ///
49    /// Learners receive log replication but do not vote in elections and
50    /// are not counted toward the commit quorum. A learner transitions
51    /// into `members` via a second `PromoteLearner` conf-change once the
52    /// leader observes it has caught up.
53    #[serde(default)]
54    pub learners: Vec<u64>,
55}
56
57impl RoutingTable {
58    /// Create a routing table with uniform distribution of vShards across groups.
59    ///
60    /// `num_groups` Raft groups are created. vShards are distributed round-robin.
61    /// Each group initially contains `nodes_per_group` nodes from the `nodes` list.
62    pub fn uniform(num_groups: u64, nodes: &[u64], replication_factor: usize) -> Self {
63        assert!(!nodes.is_empty(), "need at least one node");
64        assert!(replication_factor > 0, "need at least RF=1");
65
66        let mut vshard_to_group = Vec::with_capacity(VSHARD_COUNT as usize);
67        for i in 0..VSHARD_COUNT {
68            vshard_to_group.push((i as u64) % num_groups);
69        }
70
71        let mut group_members = HashMap::new();
72        for group_id in 0..num_groups {
73            let rf = replication_factor.min(nodes.len());
74            let start = (group_id as usize * rf) % nodes.len();
75            let members: Vec<u64> = (0..rf).map(|i| nodes[(start + i) % nodes.len()]).collect();
76            let leader = members[0];
77            group_members.insert(
78                group_id,
79                GroupInfo {
80                    leader,
81                    members,
82                    learners: Vec::new(),
83                },
84            );
85        }
86
87        Self {
88            vshard_to_group,
89            group_members,
90        }
91    }
92
93    /// Look up which Raft group owns a vShard.
94    pub fn group_for_vshard(&self, vshard_id: u16) -> Result<u64> {
95        self.vshard_to_group
96            .get(vshard_id as usize)
97            .copied()
98            .ok_or(ClusterError::VShardNotMapped { vshard_id })
99    }
100
101    /// Look up the leader node for a vShard.
102    pub fn leader_for_vshard(&self, vshard_id: u16) -> Result<u64> {
103        let group_id = self.group_for_vshard(vshard_id)?;
104        let info = self
105            .group_members
106            .get(&group_id)
107            .ok_or(ClusterError::GroupNotFound { group_id })?;
108        Ok(info.leader)
109    }
110
111    /// Get group info.
112    pub fn group_info(&self, group_id: u64) -> Option<&GroupInfo> {
113        self.group_members.get(&group_id)
114    }
115
116    /// Update the leader for a Raft group.
117    pub fn set_leader(&mut self, group_id: u64, leader: u64) {
118        if let Some(info) = self.group_members.get_mut(&group_id) {
119            info.leader = leader;
120        }
121    }
122
123    /// Atomically reassign a vShard to a different Raft group.
124    /// Used during Phase 3 (atomic cut-over) of shard migration.
125    pub fn reassign_vshard(&mut self, vshard_id: u16, new_group_id: u64) {
126        if (vshard_id as usize) < self.vshard_to_group.len() {
127            self.vshard_to_group[vshard_id as usize] = new_group_id;
128        }
129    }
130
131    /// All vShards assigned to a given group.
132    pub fn vshards_for_group(&self, group_id: u64) -> Vec<u16> {
133        self.vshard_to_group
134            .iter()
135            .enumerate()
136            .filter(|(_, gid)| **gid == group_id)
137            .map(|(i, _)| i as u16)
138            .collect()
139    }
140
141    /// Number of Raft groups.
142    pub fn num_groups(&self) -> usize {
143        self.group_members.len()
144    }
145
146    /// All group IDs.
147    pub fn group_ids(&self) -> Vec<u64> {
148        self.group_members.keys().copied().collect()
149    }
150
151    /// Update the voting members of a Raft group (for membership changes).
152    pub fn set_group_members(&mut self, group_id: u64, members: Vec<u64>) {
153        if let Some(info) = self.group_members.get_mut(&group_id) {
154            info.members = members;
155        }
156    }
157
158    /// Remove a node from a group's voter and learner lists. If the
159    /// removed node was the current leader hint, the hint is cleared
160    /// so the next query drives a fresh discovery. Returns `true` if
161    /// the group existed and anything was actually removed.
162    ///
163    /// The caller is responsible for safety: dropping below the
164    /// configured replication factor must be gated by
165    /// `decommission::safety::check_can_decommission`.
166    pub fn remove_group_member(&mut self, group_id: u64, node_id: u64) -> bool {
167        let Some(info) = self.group_members.get_mut(&group_id) else {
168            return false;
169        };
170        let before_members = info.members.len();
171        let before_learners = info.learners.len();
172        info.members.retain(|&id| id != node_id);
173        info.learners.retain(|&id| id != node_id);
174        if info.leader == node_id {
175            info.leader = 0;
176        }
177        info.members.len() != before_members || info.learners.len() != before_learners
178    }
179
180    /// Update the learner list for a Raft group.
181    pub fn set_group_learners(&mut self, group_id: u64, learners: Vec<u64>) {
182        if let Some(info) = self.group_members.get_mut(&group_id) {
183            info.learners = learners;
184        }
185    }
186
187    /// Add a learner to a group if not already present. No-op if the peer
188    /// is already a voter or a learner.
189    pub fn add_group_learner(&mut self, group_id: u64, peer: u64) {
190        if let Some(info) = self.group_members.get_mut(&group_id)
191            && !info.members.contains(&peer)
192            && !info.learners.contains(&peer)
193        {
194            info.learners.push(peer);
195        }
196    }
197
198    /// Promote a learner to a voter within a group. Returns `true` if the
199    /// learner was found and promoted.
200    pub fn promote_group_learner(&mut self, group_id: u64, peer: u64) -> bool {
201        if let Some(info) = self.group_members.get_mut(&group_id)
202            && let Some(pos) = info.learners.iter().position(|&id| id == peer)
203        {
204            info.learners.remove(pos);
205            if !info.members.contains(&peer) {
206                info.members.push(peer);
207            }
208            return true;
209        }
210        false
211    }
212
213    /// Access the vshard-to-group mapping (for persistence / wire transfer).
214    pub fn vshard_to_group(&self) -> &[u64] {
215        &self.vshard_to_group
216    }
217
218    /// Access all group members (for persistence / wire transfer).
219    pub fn group_members(&self) -> &HashMap<u64, GroupInfo> {
220        &self.group_members
221    }
222
223    /// Reconstruct a RoutingTable from persisted data.
224    pub fn from_parts(vshard_to_group: Vec<u64>, group_members: HashMap<u64, GroupInfo>) -> Self {
225        Self {
226            vshard_to_group,
227            group_members,
228        }
229    }
230}
231
232/// Compute the primary vShard for a collection name.
233///
234/// Maps a collection name to its vShard ID.
235///
236/// Must match `VShardId::from_collection()` in the nodedb types module
237/// exactly — uses u16 accumulator with multiplier 31.
238pub fn vshard_for_collection(collection: &str) -> u16 {
239    let hash = collection
240        .as_bytes()
241        .iter()
242        .fold(0u16, |h, &b| h.wrapping_mul(31).wrapping_add(b as u16));
243    hash % VSHARD_COUNT
244}
245
246/// FNV-1a 64-bit hash for deterministic key partitioning.
247///
248/// Used by distributed join shuffle and shard split to assign keys
249/// to partitions. NOT for vShard routing — use `vshard_for_collection`
250/// for that.
251pub fn fnv1a_hash(key: &str) -> u64 {
252    let mut hash: u64 = 0xcbf29ce484222325;
253    for byte in key.as_bytes() {
254        hash ^= *byte as u64;
255        hash = hash.wrapping_mul(0x100000001b3);
256    }
257    hash
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn uniform_distribution() {
266        let rt = RoutingTable::uniform(16, &[1, 2, 3], 3);
267        assert_eq!(rt.num_groups(), 16);
268
269        // Each group should have ~64 vShards (1024/16).
270        for gid in 0..16 {
271            let shards = rt.vshards_for_group(gid);
272            assert_eq!(shards.len(), 64);
273        }
274    }
275
276    #[test]
277    fn leader_lookup() {
278        let rt = RoutingTable::uniform(4, &[10, 20, 30], 3);
279        let leader = rt.leader_for_vshard(0).unwrap();
280        assert!(leader > 0);
281    }
282
283    #[test]
284    fn reassign_vshard() {
285        let mut rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
286        let old_group = rt.group_for_vshard(0).unwrap();
287        let new_group = (old_group + 1) % 4;
288        rt.reassign_vshard(0, new_group);
289        assert_eq!(rt.group_for_vshard(0).unwrap(), new_group);
290    }
291
292    #[test]
293    fn set_leader() {
294        let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
295        rt.set_leader(0, 99);
296        assert_eq!(rt.leader_for_vshard(0).unwrap(), 99);
297    }
298
299    #[test]
300    fn remove_group_member_strips_voter_and_clears_leader() {
301        let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
302        rt.set_leader(0, 2);
303        assert!(rt.remove_group_member(0, 2));
304        let info = rt.group_info(0).unwrap();
305        assert!(!info.members.contains(&2));
306        assert_eq!(info.leader, 0, "leader hint should be cleared");
307    }
308
309    #[test]
310    fn remove_group_member_strips_learner_only() {
311        let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
312        rt.add_group_learner(0, 9);
313        assert!(rt.remove_group_member(0, 9));
314        let info = rt.group_info(0).unwrap();
315        assert!(!info.learners.contains(&9));
316    }
317
318    #[test]
319    fn remove_group_member_unknown_group_returns_false() {
320        let mut rt = RoutingTable::uniform(1, &[1, 2], 2);
321        assert!(!rt.remove_group_member(99, 1));
322    }
323
324    #[test]
325    fn vshard_not_mapped() {
326        let rt = RoutingTable::uniform(2, &[1, 2], 2);
327        // All 1024 are mapped, so this shouldn't fail.
328        assert!(rt.group_for_vshard(1023).is_ok());
329    }
330}