Skip to main content

nodedb_cluster/
routing.rs

1use std::collections::HashMap;
2
3use crate::error::{ClusterError, Result};
4
5/// Number of virtual shards.
6pub const VSHARD_COUNT: u16 = 1024;
7
8/// Maps vShards to Raft groups and Raft groups to nodes.
9///
10/// The 1024 vShards are divided into distinct Raft Groups
11/// (e.g., vShards 0-63 managed by Raft Group 1 across Nodes A, B, and C).
12///
13/// This table is the authoritative routing source. It is updated atomically
14/// via Raft state machine when:
15/// - A shard migration completes (Phase 3 atomic cut-over)
16/// - A Raft group membership changes
17/// - A node joins or decommissions
18#[derive(
19    Debug,
20    Clone,
21    serde::Serialize,
22    serde::Deserialize,
23    zerompk::ToMessagePack,
24    zerompk::FromMessagePack,
25)]
26pub struct RoutingTable {
27    /// vshard_id → raft_group_id.
28    vshard_to_group: Vec<u64>,
29    /// raft_group_id → (leader_node, [replica_nodes]).
30    group_members: HashMap<u64, GroupInfo>,
31}
32
33#[derive(
34    Debug,
35    Clone,
36    Default,
37    serde::Serialize,
38    serde::Deserialize,
39    zerompk::ToMessagePack,
40    zerompk::FromMessagePack,
41)]
42pub struct GroupInfo {
43    /// Current leader node ID (0 = no leader known).
44    pub leader: u64,
45    /// All voting members (including leader).
46    pub members: Vec<u64>,
47    /// Non-voting learner peers catching up to this group.
48    ///
49    /// Learners receive log replication but do not vote in elections and
50    /// are not counted toward the commit quorum. A learner transitions
51    /// into `members` via a second `PromoteLearner` conf-change once the
52    /// leader observes it has caught up.
53    #[serde(default)]
54    pub learners: Vec<u64>,
55}
56
57impl RoutingTable {
58    /// Create a routing table with uniform distribution of vShards across groups.
59    ///
60    /// `num_groups` Raft groups are created. vShards are distributed round-robin.
61    /// Each group initially contains `nodes_per_group` nodes from the `nodes` list.
62    pub fn uniform(num_groups: u64, nodes: &[u64], replication_factor: usize) -> Self {
63        assert!(!nodes.is_empty(), "need at least one node");
64        assert!(replication_factor > 0, "need at least RF=1");
65
66        let mut vshard_to_group = Vec::with_capacity(VSHARD_COUNT as usize);
67        for i in 0..VSHARD_COUNT {
68            vshard_to_group.push((i as u64) % num_groups);
69        }
70
71        let mut group_members = HashMap::new();
72        for group_id in 0..num_groups {
73            let rf = replication_factor.min(nodes.len());
74            let start = (group_id as usize * rf) % nodes.len();
75            let members: Vec<u64> = (0..rf).map(|i| nodes[(start + i) % nodes.len()]).collect();
76            let leader = members[0];
77            group_members.insert(
78                group_id,
79                GroupInfo {
80                    leader,
81                    members,
82                    learners: Vec::new(),
83                },
84            );
85        }
86
87        Self {
88            vshard_to_group,
89            group_members,
90        }
91    }
92
93    /// Look up which Raft group owns a vShard.
94    pub fn group_for_vshard(&self, vshard_id: u16) -> Result<u64> {
95        self.vshard_to_group
96            .get(vshard_id as usize)
97            .copied()
98            .ok_or(ClusterError::VShardNotMapped { vshard_id })
99    }
100
101    /// Look up the leader node for a vShard.
102    pub fn leader_for_vshard(&self, vshard_id: u16) -> Result<u64> {
103        let group_id = self.group_for_vshard(vshard_id)?;
104        let info = self
105            .group_members
106            .get(&group_id)
107            .ok_or(ClusterError::GroupNotFound { group_id })?;
108        Ok(info.leader)
109    }
110
111    /// Get group info.
112    pub fn group_info(&self, group_id: u64) -> Option<&GroupInfo> {
113        self.group_members.get(&group_id)
114    }
115
116    /// Update the leader for a Raft group.
117    pub fn set_leader(&mut self, group_id: u64, leader: u64) {
118        if let Some(info) = self.group_members.get_mut(&group_id) {
119            info.leader = leader;
120        }
121    }
122
123    /// Atomically reassign a vShard to a different Raft group.
124    /// Used during Phase 3 (atomic cut-over) of shard migration.
125    pub fn reassign_vshard(&mut self, vshard_id: u16, new_group_id: u64) {
126        if (vshard_id as usize) < self.vshard_to_group.len() {
127            self.vshard_to_group[vshard_id as usize] = new_group_id;
128        }
129    }
130
131    /// All vShards assigned to a given group.
132    pub fn vshards_for_group(&self, group_id: u64) -> Vec<u16> {
133        self.vshard_to_group
134            .iter()
135            .enumerate()
136            .filter(|(_, gid)| **gid == group_id)
137            .map(|(i, _)| i as u16)
138            .collect()
139    }
140
141    /// Number of Raft groups.
142    pub fn num_groups(&self) -> usize {
143        self.group_members.len()
144    }
145
146    /// All group IDs.
147    pub fn group_ids(&self) -> Vec<u64> {
148        self.group_members.keys().copied().collect()
149    }
150
151    /// Update the voting members of a Raft group (for membership changes).
152    pub fn set_group_members(&mut self, group_id: u64, members: Vec<u64>) {
153        if let Some(info) = self.group_members.get_mut(&group_id) {
154            info.members = members;
155        }
156    }
157
158    /// Update the learner list for a Raft group.
159    pub fn set_group_learners(&mut self, group_id: u64, learners: Vec<u64>) {
160        if let Some(info) = self.group_members.get_mut(&group_id) {
161            info.learners = learners;
162        }
163    }
164
165    /// Add a learner to a group if not already present. No-op if the peer
166    /// is already a voter or a learner.
167    pub fn add_group_learner(&mut self, group_id: u64, peer: u64) {
168        if let Some(info) = self.group_members.get_mut(&group_id)
169            && !info.members.contains(&peer)
170            && !info.learners.contains(&peer)
171        {
172            info.learners.push(peer);
173        }
174    }
175
176    /// Promote a learner to a voter within a group. Returns `true` if the
177    /// learner was found and promoted.
178    pub fn promote_group_learner(&mut self, group_id: u64, peer: u64) -> bool {
179        if let Some(info) = self.group_members.get_mut(&group_id)
180            && let Some(pos) = info.learners.iter().position(|&id| id == peer)
181        {
182            info.learners.remove(pos);
183            if !info.members.contains(&peer) {
184                info.members.push(peer);
185            }
186            return true;
187        }
188        false
189    }
190
191    /// Access the vshard-to-group mapping (for persistence / wire transfer).
192    pub fn vshard_to_group(&self) -> &[u64] {
193        &self.vshard_to_group
194    }
195
196    /// Access all group members (for persistence / wire transfer).
197    pub fn group_members(&self) -> &HashMap<u64, GroupInfo> {
198        &self.group_members
199    }
200
201    /// Reconstruct a RoutingTable from persisted data.
202    pub fn from_parts(vshard_to_group: Vec<u64>, group_members: HashMap<u64, GroupInfo>) -> Self {
203        Self {
204            vshard_to_group,
205            group_members,
206        }
207    }
208}
209
210/// Compute the primary vShard for a collection name.
211///
212/// Maps a collection name to its vShard ID.
213///
214/// Must match `VShardId::from_collection()` in the nodedb types module
215/// exactly — uses u16 accumulator with multiplier 31.
216pub fn vshard_for_collection(collection: &str) -> u16 {
217    let hash = collection
218        .as_bytes()
219        .iter()
220        .fold(0u16, |h, &b| h.wrapping_mul(31).wrapping_add(b as u16));
221    hash % VSHARD_COUNT
222}
223
224/// FNV-1a 64-bit hash for deterministic key partitioning.
225///
226/// Used by distributed join shuffle and shard split to assign keys
227/// to partitions. NOT for vShard routing — use `vshard_for_collection`
228/// for that.
229pub fn fnv1a_hash(key: &str) -> u64 {
230    let mut hash: u64 = 0xcbf29ce484222325;
231    for byte in key.as_bytes() {
232        hash ^= *byte as u64;
233        hash = hash.wrapping_mul(0x100000001b3);
234    }
235    hash
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn uniform_distribution() {
244        let rt = RoutingTable::uniform(16, &[1, 2, 3], 3);
245        assert_eq!(rt.num_groups(), 16);
246
247        // Each group should have ~64 vShards (1024/16).
248        for gid in 0..16 {
249            let shards = rt.vshards_for_group(gid);
250            assert_eq!(shards.len(), 64);
251        }
252    }
253
254    #[test]
255    fn leader_lookup() {
256        let rt = RoutingTable::uniform(4, &[10, 20, 30], 3);
257        let leader = rt.leader_for_vshard(0).unwrap();
258        assert!(leader > 0);
259    }
260
261    #[test]
262    fn reassign_vshard() {
263        let mut rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
264        let old_group = rt.group_for_vshard(0).unwrap();
265        let new_group = (old_group + 1) % 4;
266        rt.reassign_vshard(0, new_group);
267        assert_eq!(rt.group_for_vshard(0).unwrap(), new_group);
268    }
269
270    #[test]
271    fn set_leader() {
272        let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
273        rt.set_leader(0, 99);
274        assert_eq!(rt.leader_for_vshard(0).unwrap(), 99);
275    }
276
277    #[test]
278    fn vshard_not_mapped() {
279        let rt = RoutingTable::uniform(2, &[1, 2], 2);
280        // All 1024 are mapped, so this shouldn't fail.
281        assert!(rt.group_for_vshard(1023).is_ok());
282    }
283}